risingwave_frontend/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29    DEFAULT_SUPER_USER_ID, FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId,
30    PG_CATALOG_SCHEMA_NAME, RW_CATALOG_SCHEMA_NAME, TableId,
31};
32use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
33use risingwave_common::session_config::SessionConfig;
34use risingwave_common::system_param::reader::SystemParamsReader;
35use risingwave_common::util::cluster_limit::ClusterLimit;
36use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
37use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
38use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
39use risingwave_pb::backup_service::MetaSnapshotMetadata;
40use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
41use risingwave_pb::catalog::{
42    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
43    PbSubscription, PbTable, PbView, Table,
44};
45use risingwave_pb::common::WorkerNode;
46use risingwave_pb::ddl_service::alter_owner_request::Object;
47use risingwave_pb::ddl_service::{
48    DdlProgress, PbTableJobType, ReplaceJobPlan, TableJobType, alter_name_request,
49    alter_set_schema_request, alter_swap_rename_request, create_connection_request,
50};
51use risingwave_pb::hummock::write_limits::WriteLimit;
52use risingwave_pb::hummock::{
53    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
54};
55use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
56use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
57use risingwave_pb::meta::list_actor_states_response::ActorState;
58use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
59use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
60use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
61use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
62use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
63use risingwave_pb::meta::{
64    EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
65    SystemParams,
66};
67use risingwave_pb::secret::PbSecretRef;
68use risingwave_pb::stream_plan::StreamFragmentGraph;
69use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
70use risingwave_pb::user::update_user_request::UpdateField;
71use risingwave_pb::user::{GrantPrivilege, UserInfo};
72use risingwave_rpc_client::error::Result as RpcResult;
73use tempfile::{Builder, NamedTempFile};
74
75use crate::FrontendOpts;
76use crate::catalog::catalog_service::CatalogWriter;
77use crate::catalog::root_catalog::Catalog;
78use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId};
79use crate::error::{ErrorCode, Result};
80use crate::handler::RwPgResponse;
81use crate::meta_client::FrontendMetaClient;
82use crate::scheduler::HummockSnapshotManagerRef;
83use crate::session::{AuthContext, FrontendEnv, SessionImpl};
84use crate::user::UserId;
85use crate::user::user_manager::UserInfoManager;
86use crate::user::user_service::UserInfoWriter;
87
88/// An embedded frontend without starting meta and without starting frontend as a tcp server.
89pub struct LocalFrontend {
90    pub opts: FrontendOpts,
91    env: FrontendEnv,
92}
93
94impl SessionManager for LocalFrontend {
95    type Session = SessionImpl;
96
97    fn create_dummy_session(
98        &self,
99        _database_id: u32,
100        _user_name: u32,
101    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
102        unreachable!()
103    }
104
105    fn connect(
106        &self,
107        _database: &str,
108        _user_name: &str,
109        _peer_addr: AddressRef,
110    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
111        Ok(self.session_ref())
112    }
113
114    fn cancel_queries_in_session(&self, _session_id: SessionId) {
115        unreachable!()
116    }
117
118    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
119        unreachable!()
120    }
121
122    fn end_session(&self, _session: &Self::Session) {
123        unreachable!()
124    }
125}
126
127impl LocalFrontend {
128    #[expect(clippy::unused_async)]
129    pub async fn new(opts: FrontendOpts) -> Self {
130        let env = FrontendEnv::mock();
131        Self { opts, env }
132    }
133
134    pub async fn run_sql(
135        &self,
136        sql: impl Into<String>,
137    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
138        let sql: Arc<str> = Arc::from(sql.into());
139        self.session_ref().run_statement(sql, vec![]).await
140    }
141
142    pub async fn run_sql_with_session(
143        &self,
144        session_ref: Arc<SessionImpl>,
145        sql: impl Into<String>,
146    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
147        let sql: Arc<str> = Arc::from(sql.into());
148        session_ref.run_statement(sql, vec![]).await
149    }
150
151    pub async fn run_user_sql(
152        &self,
153        sql: impl Into<String>,
154        database: String,
155        user_name: String,
156        user_id: UserId,
157    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
158        let sql: Arc<str> = Arc::from(sql.into());
159        self.session_user_ref(database, user_name, user_id)
160            .run_statement(sql, vec![])
161            .await
162    }
163
164    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
165        let mut rsp = self.run_sql(sql).await.unwrap();
166        let mut res = vec![];
167        #[for_await]
168        for row_set in rsp.values_stream() {
169            for row in row_set.unwrap() {
170                res.push(format!("{:?}", row));
171            }
172        }
173        res
174    }
175
176    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
177        let mut rsp = self.run_sql(sql).await.unwrap();
178        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
179        let mut res = String::new();
180        #[for_await]
181        for row_set in rsp.values_stream() {
182            for row in row_set.unwrap() {
183                let row: Row = row;
184                let row = row.values()[0].as_ref().unwrap();
185                res += std::str::from_utf8(row).unwrap();
186                res += "\n";
187            }
188        }
189        res
190    }
191
192    /// Creates a new session
193    pub fn session_ref(&self) -> Arc<SessionImpl> {
194        self.session_user_ref(
195            DEFAULT_DATABASE_NAME.to_owned(),
196            DEFAULT_SUPER_USER.to_owned(),
197            DEFAULT_SUPER_USER_ID,
198        )
199    }
200
201    pub fn session_user_ref(
202        &self,
203        database: String,
204        user_name: String,
205        user_id: UserId,
206    ) -> Arc<SessionImpl> {
207        Arc::new(SessionImpl::new(
208            self.env.clone(),
209            AuthContext::new(database, user_name, user_id),
210            UserAuthenticator::None,
211            // Local Frontend use a non-sense id.
212            (0, 0),
213            Address::Tcp(SocketAddr::new(
214                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
215                6666,
216            ))
217            .into(),
218            Default::default(),
219        ))
220    }
221}
222
223pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
224    if rsp.stmt_type() != StatementType::EXPLAIN {
225        panic!("RESPONSE INVALID: {rsp:?}");
226    }
227    let mut res = String::new();
228    #[for_await]
229    for row_set in rsp.values_stream() {
230        for row in row_set.unwrap() {
231            let row: Row = row;
232            let row = row.values()[0].as_ref().unwrap();
233            res += std::str::from_utf8(row).unwrap();
234            res += "\n";
235        }
236    }
237    res
238}
239
240pub struct MockCatalogWriter {
241    catalog: Arc<RwLock<Catalog>>,
242    id: AtomicU32,
243    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
244    schema_id_to_database_id: RwLock<HashMap<u32, DatabaseId>>,
245    hummock_snapshot_manager: HummockSnapshotManagerRef,
246}
247
248#[async_trait::async_trait]
249impl CatalogWriter for MockCatalogWriter {
250    async fn create_database(
251        &self,
252        db_name: &str,
253        owner: UserId,
254        resource_group: &str,
255        barrier_interval_ms: Option<u32>,
256        checkpoint_frequency: Option<u64>,
257    ) -> Result<()> {
258        let database_id = self.gen_id();
259        self.catalog.write().create_database(&PbDatabase {
260            name: db_name.to_owned(),
261            id: database_id,
262            owner,
263            resource_group: resource_group.to_owned(),
264            barrier_interval_ms,
265            checkpoint_frequency,
266        });
267        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
268            .await?;
269        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
270            .await?;
271        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
272            .await?;
273        Ok(())
274    }
275
276    async fn create_schema(
277        &self,
278        db_id: DatabaseId,
279        schema_name: &str,
280        owner: UserId,
281    ) -> Result<()> {
282        let id = self.gen_id();
283        self.catalog.write().create_schema(&PbSchema {
284            id,
285            name: schema_name.to_owned(),
286            database_id: db_id,
287            owner,
288        });
289        self.add_schema_id(id, db_id);
290        Ok(())
291    }
292
293    async fn create_materialized_view(
294        &self,
295        mut table: PbTable,
296        _graph: StreamFragmentGraph,
297        _dependencies: HashSet<ObjectId>,
298        _specific_resource_group: Option<String>,
299        _if_not_exists: bool,
300    ) -> Result<()> {
301        table.id = self.gen_id();
302        table.stream_job_status = PbStreamJobStatus::Created as _;
303        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
304        self.catalog.write().create_table(&table);
305        self.add_table_or_source_id(table.id, table.schema_id, table.database_id);
306        self.hummock_snapshot_manager
307            .add_table_for_test(TableId::new(table.id));
308        Ok(())
309    }
310
311    async fn replace_materialized_view(
312        &self,
313        mut table: PbTable,
314        _graph: StreamFragmentGraph,
315    ) -> Result<()> {
316        table.stream_job_status = PbStreamJobStatus::Created as _;
317        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
318        self.catalog.write().update_table(&table);
319        Ok(())
320    }
321
322    async fn create_view(&self, mut view: PbView) -> Result<()> {
323        view.id = self.gen_id();
324        self.catalog.write().create_view(&view);
325        self.add_table_or_source_id(view.id, view.schema_id, view.database_id);
326        Ok(())
327    }
328
329    async fn create_table(
330        &self,
331        source: Option<PbSource>,
332        mut table: PbTable,
333        graph: StreamFragmentGraph,
334        _job_type: PbTableJobType,
335        if_not_exists: bool,
336    ) -> Result<()> {
337        if let Some(source) = source {
338            let source_id = self.create_source_inner(source)?;
339            table.optional_associated_source_id =
340                Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
341        }
342        self.create_materialized_view(table, graph, HashSet::new(), None, if_not_exists)
343            .await?;
344        Ok(())
345    }
346
347    async fn replace_table(
348        &self,
349        _source: Option<PbSource>,
350        mut table: PbTable,
351        _graph: StreamFragmentGraph,
352        _job_type: TableJobType,
353    ) -> Result<()> {
354        table.stream_job_status = PbStreamJobStatus::Created as _;
355        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
356        self.catalog.write().update_table(&table);
357        Ok(())
358    }
359
360    async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
361        self.catalog.write().update_source(&source);
362        Ok(())
363    }
364
365    async fn create_source(
366        &self,
367        source: PbSource,
368        _graph: Option<StreamFragmentGraph>,
369        _if_not_exists: bool,
370    ) -> Result<()> {
371        self.create_source_inner(source).map(|_| ())
372    }
373
374    async fn create_sink(
375        &self,
376        sink: PbSink,
377        graph: StreamFragmentGraph,
378        _affected_table_change: Option<ReplaceJobPlan>,
379        _dependencies: HashSet<ObjectId>,
380        _if_not_exists: bool,
381    ) -> Result<()> {
382        self.create_sink_inner(sink, graph)
383    }
384
385    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
386        self.create_subscription_inner(subscription)
387    }
388
389    async fn create_index(
390        &self,
391        mut index: PbIndex,
392        mut index_table: PbTable,
393        _graph: StreamFragmentGraph,
394        _if_not_exists: bool,
395    ) -> Result<()> {
396        index_table.id = self.gen_id();
397        index_table.stream_job_status = PbStreamJobStatus::Created as _;
398        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
399        self.catalog.write().create_table(&index_table);
400        self.add_table_or_index_id(
401            index_table.id,
402            index_table.schema_id,
403            index_table.database_id,
404        );
405
406        index.id = index_table.id;
407        index.index_table_id = index_table.id;
408        self.catalog.write().create_index(&index);
409        Ok(())
410    }
411
412    async fn create_function(&self, _function: PbFunction) -> Result<()> {
413        unreachable!()
414    }
415
416    async fn create_connection(
417        &self,
418        _connection_name: String,
419        _database_id: u32,
420        _schema_id: u32,
421        _owner_id: u32,
422        _connection: create_connection_request::Payload,
423    ) -> Result<()> {
424        unreachable!()
425    }
426
427    async fn create_secret(
428        &self,
429        _secret_name: String,
430        _database_id: u32,
431        _schema_id: u32,
432        _owner_id: u32,
433        _payload: Vec<u8>,
434    ) -> Result<()> {
435        unreachable!()
436    }
437
438    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
439        unreachable!()
440    }
441
442    async fn drop_table(
443        &self,
444        source_id: Option<u32>,
445        table_id: TableId,
446        cascade: bool,
447    ) -> Result<()> {
448        if cascade {
449            return Err(ErrorCode::NotSupported(
450                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
451                "use drop instead".to_owned(),
452            )
453            .into());
454        }
455        if let Some(source_id) = source_id {
456            self.drop_table_or_source_id(source_id);
457        }
458        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
459        let indexes =
460            self.catalog
461                .read()
462                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
463        for index in indexes {
464            self.drop_index(index.id, cascade).await?;
465        }
466        self.catalog
467            .write()
468            .drop_table(database_id, schema_id, table_id);
469        if let Some(source_id) = source_id {
470            self.catalog
471                .write()
472                .drop_source(database_id, schema_id, source_id);
473        }
474        Ok(())
475    }
476
477    async fn drop_view(&self, _view_id: u32, _cascade: bool) -> Result<()> {
478        unreachable!()
479    }
480
481    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
482        if cascade {
483            return Err(ErrorCode::NotSupported(
484                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
485                "use drop instead".to_owned(),
486            )
487            .into());
488        }
489        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
490        let indexes =
491            self.catalog
492                .read()
493                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
494        for index in indexes {
495            self.drop_index(index.id, cascade).await?;
496        }
497        self.catalog
498            .write()
499            .drop_table(database_id, schema_id, table_id);
500        Ok(())
501    }
502
503    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
504        if cascade {
505            return Err(ErrorCode::NotSupported(
506                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
507                "use drop instead".to_owned(),
508            )
509            .into());
510        }
511        let (database_id, schema_id) = self.drop_table_or_source_id(source_id);
512        self.catalog
513            .write()
514            .drop_source(database_id, schema_id, source_id);
515        Ok(())
516    }
517
518    async fn drop_sink(
519        &self,
520        sink_id: u32,
521        cascade: bool,
522        _target_table_change: Option<ReplaceJobPlan>,
523    ) -> Result<()> {
524        if cascade {
525            return Err(ErrorCode::NotSupported(
526                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
527                "use drop instead".to_owned(),
528            )
529            .into());
530        }
531        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id);
532        self.catalog
533            .write()
534            .drop_sink(database_id, schema_id, sink_id);
535        Ok(())
536    }
537
538    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
539        if cascade {
540            return Err(ErrorCode::NotSupported(
541                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
542                "use drop instead".to_owned(),
543            )
544            .into());
545        }
546        let (database_id, schema_id) = self.drop_table_or_subscription_id(subscription_id);
547        self.catalog
548            .write()
549            .drop_subscription(database_id, schema_id, subscription_id);
550        Ok(())
551    }
552
553    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
554        if cascade {
555            return Err(ErrorCode::NotSupported(
556                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
557                "use drop instead".to_owned(),
558            )
559            .into());
560        }
561        let &schema_id = self
562            .table_id_to_schema_id
563            .read()
564            .get(&index_id.index_id)
565            .unwrap();
566        let database_id = self.get_database_id_by_schema(schema_id);
567
568        let index = {
569            let catalog_reader = self.catalog.read();
570            let schema_catalog = catalog_reader
571                .get_schema_by_id(&database_id, &schema_id)
572                .unwrap();
573            schema_catalog.get_index_by_id(&index_id).unwrap().clone()
574        };
575
576        let index_table_id = index.index_table.id;
577        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.index_id);
578        self.catalog
579            .write()
580            .drop_index(database_id, schema_id, index_id);
581        self.catalog
582            .write()
583            .drop_table(database_id, schema_id, index_table_id);
584        Ok(())
585    }
586
587    async fn drop_function(&self, _function_id: FunctionId) -> Result<()> {
588        unreachable!()
589    }
590
591    async fn drop_connection(&self, _connection_id: ConnectionId) -> Result<()> {
592        unreachable!()
593    }
594
595    async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
596        unreachable!()
597    }
598
599    async fn drop_database(&self, database_id: u32) -> Result<()> {
600        self.catalog.write().drop_database(database_id);
601        Ok(())
602    }
603
604    async fn drop_schema(&self, schema_id: u32, _cascade: bool) -> Result<()> {
605        let database_id = self.drop_schema_id(schema_id);
606        self.catalog.write().drop_schema(database_id, schema_id);
607        Ok(())
608    }
609
610    async fn alter_name(
611        &self,
612        object_id: alter_name_request::Object,
613        object_name: &str,
614    ) -> Result<()> {
615        match object_id {
616            alter_name_request::Object::TableId(table_id) => {
617                self.catalog
618                    .write()
619                    .alter_table_name_by_id(&table_id.into(), object_name);
620                Ok(())
621            }
622            _ => {
623                unimplemented!()
624            }
625        }
626    }
627
628    async fn alter_source(&self, source: PbSource) -> Result<()> {
629        self.catalog.write().update_source(&source);
630        Ok(())
631    }
632
633    async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
634        for database in self.catalog.read().iter_databases() {
635            for schema in database.iter_schemas() {
636                match object {
637                    Object::TableId(table_id) => {
638                        if let Some(table) =
639                            schema.get_created_table_by_id(&TableId::from(table_id))
640                        {
641                            let mut pb_table = table.to_prost();
642                            pb_table.owner = owner_id;
643                            self.catalog.write().update_table(&pb_table);
644                            return Ok(());
645                        }
646                    }
647                    _ => unreachable!(),
648                }
649            }
650        }
651
652        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
653    }
654
655    async fn alter_set_schema(
656        &self,
657        object: alter_set_schema_request::Object,
658        new_schema_id: u32,
659    ) -> Result<()> {
660        match object {
661            alter_set_schema_request::Object::TableId(table_id) => {
662                let mut pb_table = {
663                    let reader = self.catalog.read();
664                    let table = reader.get_any_table_by_id(&table_id.into())?.to_owned();
665                    table.to_prost()
666                };
667                pb_table.schema_id = new_schema_id;
668                self.catalog.write().update_table(&pb_table);
669                self.table_id_to_schema_id
670                    .write()
671                    .insert(table_id, new_schema_id);
672                Ok(())
673            }
674            _ => unreachable!(),
675        }
676    }
677
678    async fn alter_parallelism(
679        &self,
680        _table_id: u32,
681        _parallelism: PbTableParallelism,
682        _deferred: bool,
683    ) -> Result<()> {
684        todo!()
685    }
686
687    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
688        todo!()
689    }
690
691    async fn alter_secret(
692        &self,
693        _secret_id: u32,
694        _secret_name: String,
695        _database_id: u32,
696        _schema_id: u32,
697        _owner_id: u32,
698        _payload: Vec<u8>,
699    ) -> Result<()> {
700        unreachable!()
701    }
702
703    async fn alter_resource_group(
704        &self,
705        _table_id: u32,
706        _resource_group: Option<String>,
707        _deferred: bool,
708    ) -> Result<()> {
709        todo!()
710    }
711
712    async fn alter_database_param(
713        &self,
714        database_id: u32,
715        param: AlterDatabaseParam,
716    ) -> Result<()> {
717        let mut pb_database = {
718            let reader = self.catalog.read();
719            let database = reader.get_database_by_id(&database_id)?.to_owned();
720            database.to_prost()
721        };
722        match param {
723            AlterDatabaseParam::BarrierIntervalMs(interval) => {
724                pb_database.barrier_interval_ms = interval;
725            }
726            AlterDatabaseParam::CheckpointFrequency(frequency) => {
727                pb_database.checkpoint_frequency = frequency;
728            }
729        }
730        self.catalog.write().update_database(&pb_database);
731        Ok(())
732    }
733}
734
735impl MockCatalogWriter {
736    pub fn new(
737        catalog: Arc<RwLock<Catalog>>,
738        hummock_snapshot_manager: HummockSnapshotManagerRef,
739    ) -> Self {
740        catalog.write().create_database(&PbDatabase {
741            id: 0,
742            name: DEFAULT_DATABASE_NAME.to_owned(),
743            owner: DEFAULT_SUPER_USER_ID,
744            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
745            barrier_interval_ms: None,
746            checkpoint_frequency: None,
747        });
748        catalog.write().create_schema(&PbSchema {
749            id: 1,
750            name: DEFAULT_SCHEMA_NAME.to_owned(),
751            database_id: 0,
752            owner: DEFAULT_SUPER_USER_ID,
753        });
754        catalog.write().create_schema(&PbSchema {
755            id: 2,
756            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
757            database_id: 0,
758            owner: DEFAULT_SUPER_USER_ID,
759        });
760        catalog.write().create_schema(&PbSchema {
761            id: 3,
762            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
763            database_id: 0,
764            owner: DEFAULT_SUPER_USER_ID,
765        });
766        let mut map: HashMap<u32, DatabaseId> = HashMap::new();
767        map.insert(1_u32, 0_u32);
768        map.insert(2_u32, 0_u32);
769        map.insert(3_u32, 0_u32);
770        Self {
771            catalog,
772            id: AtomicU32::new(3),
773            table_id_to_schema_id: Default::default(),
774            schema_id_to_database_id: RwLock::new(map),
775            hummock_snapshot_manager,
776        }
777    }
778
779    fn gen_id(&self) -> u32 {
780        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
781        self.id.fetch_add(1, Ordering::SeqCst) + 1
782    }
783
784    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
785        self.table_id_to_schema_id
786            .write()
787            .insert(table_id, schema_id);
788    }
789
790    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
791        let schema_id = self
792            .table_id_to_schema_id
793            .write()
794            .remove(&table_id)
795            .unwrap();
796        (self.get_database_id_by_schema(schema_id), schema_id)
797    }
798
799    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
800        self.table_id_to_schema_id
801            .write()
802            .insert(table_id, schema_id);
803    }
804
805    fn add_table_or_subscription_id(
806        &self,
807        table_id: u32,
808        schema_id: SchemaId,
809        _database_id: DatabaseId,
810    ) {
811        self.table_id_to_schema_id
812            .write()
813            .insert(table_id, schema_id);
814    }
815
816    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
817        self.table_id_to_schema_id
818            .write()
819            .insert(table_id, schema_id);
820    }
821
822    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
823        let schema_id = self
824            .table_id_to_schema_id
825            .write()
826            .remove(&table_id)
827            .unwrap();
828        (self.get_database_id_by_schema(schema_id), schema_id)
829    }
830
831    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
832        let schema_id = self
833            .table_id_to_schema_id
834            .write()
835            .remove(&table_id)
836            .unwrap();
837        (self.get_database_id_by_schema(schema_id), schema_id)
838    }
839
840    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
841        let schema_id = self
842            .table_id_to_schema_id
843            .write()
844            .remove(&table_id)
845            .unwrap();
846        (self.get_database_id_by_schema(schema_id), schema_id)
847    }
848
849    fn add_schema_id(&self, schema_id: u32, database_id: DatabaseId) {
850        self.schema_id_to_database_id
851            .write()
852            .insert(schema_id, database_id);
853    }
854
855    fn drop_schema_id(&self, schema_id: u32) -> DatabaseId {
856        self.schema_id_to_database_id
857            .write()
858            .remove(&schema_id)
859            .unwrap()
860    }
861
862    fn create_source_inner(&self, mut source: PbSource) -> Result<u32> {
863        source.id = self.gen_id();
864        self.catalog.write().create_source(&source);
865        self.add_table_or_source_id(source.id, source.schema_id, source.database_id);
866        Ok(source.id)
867    }
868
869    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
870        sink.id = self.gen_id();
871        self.catalog.write().create_sink(&sink);
872        self.add_table_or_sink_id(sink.id, sink.schema_id, sink.database_id);
873        Ok(())
874    }
875
876    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
877        subscription.id = self.gen_id();
878        self.catalog.write().create_subscription(&subscription);
879        self.add_table_or_subscription_id(
880            subscription.id,
881            subscription.schema_id,
882            subscription.database_id,
883        );
884        Ok(())
885    }
886
887    fn get_database_id_by_schema(&self, schema_id: u32) -> DatabaseId {
888        *self
889            .schema_id_to_database_id
890            .read()
891            .get(&schema_id)
892            .unwrap()
893    }
894}
895
896pub struct MockUserInfoWriter {
897    id: AtomicU32,
898    user_info: Arc<RwLock<UserInfoManager>>,
899}
900
901#[async_trait::async_trait]
902impl UserInfoWriter for MockUserInfoWriter {
903    async fn create_user(&self, user: UserInfo) -> Result<()> {
904        let mut user = user;
905        user.id = self.gen_id();
906        self.user_info.write().create_user(user);
907        Ok(())
908    }
909
910    async fn drop_user(&self, id: UserId) -> Result<()> {
911        self.user_info.write().drop_user(id);
912        Ok(())
913    }
914
915    async fn update_user(
916        &self,
917        update_user: UserInfo,
918        update_fields: Vec<UpdateField>,
919    ) -> Result<()> {
920        let mut lock = self.user_info.write();
921        let id = update_user.get_id();
922        let Some(old_name) = lock.get_user_name_by_id(id) else {
923            return Ok(());
924        };
925        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
926        update_fields.into_iter().for_each(|field| match field {
927            UpdateField::Super => user_info.is_super = update_user.is_super,
928            UpdateField::Login => user_info.can_login = update_user.can_login,
929            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
930            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
931            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
932            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
933            UpdateField::Unspecified => unreachable!(),
934        });
935        lock.update_user(update_user);
936        Ok(())
937    }
938
939    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
940    /// `GrantAllSources` when grant privilege to user.
941    async fn grant_privilege(
942        &self,
943        users: Vec<UserId>,
944        privileges: Vec<GrantPrivilege>,
945        with_grant_option: bool,
946        _grantor: UserId,
947    ) -> Result<()> {
948        let privileges = privileges
949            .into_iter()
950            .map(|mut p| {
951                p.action_with_opts
952                    .iter_mut()
953                    .for_each(|ao| ao.with_grant_option = with_grant_option);
954                p
955            })
956            .collect::<Vec<_>>();
957        for user_id in users {
958            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
959                u.extend_privileges(privileges.clone());
960            }
961        }
962        Ok(())
963    }
964
965    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
966    /// `RevokeAllSources` when revoke privilege from user.
967    async fn revoke_privilege(
968        &self,
969        users: Vec<UserId>,
970        privileges: Vec<GrantPrivilege>,
971        _granted_by: UserId,
972        _revoke_by: UserId,
973        revoke_grant_option: bool,
974        _cascade: bool,
975    ) -> Result<()> {
976        for user_id in users {
977            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
978                u.revoke_privileges(privileges.clone(), revoke_grant_option);
979            }
980        }
981        Ok(())
982    }
983
984    async fn alter_default_privilege(
985        &self,
986        _users: Vec<UserId>,
987        _database_id: DatabaseId,
988        _schemas: Vec<SchemaId>,
989        _operation: AlterDefaultPrivilegeOperation,
990        _operated_by: UserId,
991    ) -> Result<()> {
992        todo!()
993    }
994}
995
996impl MockUserInfoWriter {
997    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
998        user_info.write().create_user(UserInfo {
999            id: DEFAULT_SUPER_USER_ID,
1000            name: DEFAULT_SUPER_USER.to_owned(),
1001            is_super: true,
1002            can_create_db: true,
1003            can_create_user: true,
1004            can_login: true,
1005            ..Default::default()
1006        });
1007        Self {
1008            user_info,
1009            id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
1010        }
1011    }
1012
1013    fn gen_id(&self) -> u32 {
1014        self.id.fetch_add(1, Ordering::SeqCst)
1015    }
1016}
1017
1018pub struct MockFrontendMetaClient {}
1019
1020#[async_trait::async_trait]
1021impl FrontendMetaClient for MockFrontendMetaClient {
1022    async fn try_unregister(&self) {}
1023
1024    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1025        Ok(INVALID_VERSION_ID)
1026    }
1027
1028    async fn wait(&self) -> RpcResult<()> {
1029        Ok(())
1030    }
1031
1032    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1033        Ok(vec![])
1034    }
1035
1036    async fn list_table_fragments(
1037        &self,
1038        _table_ids: &[u32],
1039    ) -> RpcResult<HashMap<u32, TableFragmentInfo>> {
1040        Ok(HashMap::default())
1041    }
1042
1043    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1044        Ok(vec![])
1045    }
1046
1047    async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1048        Ok(vec![])
1049    }
1050
1051    async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1052        Ok(vec![])
1053    }
1054
1055    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1056        Ok(vec![])
1057    }
1058
1059    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1060        Ok(vec![])
1061    }
1062
1063    async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1064        Ok(vec![])
1065    }
1066
1067    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1068        Ok(vec![])
1069    }
1070
1071    async fn set_system_param(
1072        &self,
1073        _param: String,
1074        _value: Option<String>,
1075    ) -> RpcResult<Option<SystemParamsReader>> {
1076        Ok(Some(SystemParams::default().into()))
1077    }
1078
1079    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1080        Ok(Default::default())
1081    }
1082
1083    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1084        Ok("".to_owned())
1085    }
1086
1087    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1088        Ok(vec![])
1089    }
1090
1091    async fn get_tables(
1092        &self,
1093        _table_ids: &[u32],
1094        _include_dropped_tables: bool,
1095    ) -> RpcResult<HashMap<u32, Table>> {
1096        Ok(HashMap::new())
1097    }
1098
1099    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(u32, u64)>> {
1100        unimplemented!()
1101    }
1102
1103    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1104        Ok(HummockVersion::default())
1105    }
1106
1107    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1108        unimplemented!()
1109    }
1110
1111    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1112        unimplemented!()
1113    }
1114
1115    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1116        unimplemented!()
1117    }
1118
1119    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1120        unimplemented!()
1121    }
1122
1123    async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1124        unimplemented!()
1125    }
1126
1127    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1128        unimplemented!()
1129    }
1130
1131    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1132        unimplemented!()
1133    }
1134
1135    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1136        unimplemented!()
1137    }
1138
1139    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1140        Ok(vec![])
1141    }
1142
1143    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1144        unimplemented!()
1145    }
1146
1147    async fn recover(&self) -> RpcResult<()> {
1148        unimplemented!()
1149    }
1150
1151    async fn apply_throttle(
1152        &self,
1153        _kind: PbThrottleTarget,
1154        _id: u32,
1155        _rate_limit: Option<u32>,
1156    ) -> RpcResult<()> {
1157        unimplemented!()
1158    }
1159
1160    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1161        Ok(RecoveryStatus::StatusRunning)
1162    }
1163
1164    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1165        Ok(vec![])
1166    }
1167
1168    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1169        Ok(vec![])
1170    }
1171
1172    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1173        unimplemented!()
1174    }
1175
1176    async fn alter_sink_props(
1177        &self,
1178        _sink_id: u32,
1179        _changed_props: BTreeMap<String, String>,
1180        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1181        _connector_conn_ref: Option<u32>,
1182    ) -> RpcResult<()> {
1183        unimplemented!()
1184    }
1185
1186    async fn alter_source_connector_props(
1187        &self,
1188        _source_id: u32,
1189        _changed_props: BTreeMap<String, String>,
1190        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1191        _connector_conn_ref: Option<u32>,
1192    ) -> RpcResult<()> {
1193        unimplemented!()
1194    }
1195
1196    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1197        unimplemented!()
1198    }
1199
1200    async fn get_fragment_by_id(
1201        &self,
1202        _fragment_id: u32,
1203    ) -> RpcResult<Option<FragmentDistribution>> {
1204        unimplemented!()
1205    }
1206
1207    fn worker_id(&self) -> u32 {
1208        0
1209    }
1210
1211    async fn set_sync_log_store_aligned(&self, _job_id: u32, _aligned: bool) -> RpcResult<()> {
1212        Ok(())
1213    }
1214}
1215
1216#[cfg(test)]
1217pub static PROTO_FILE_DATA: &str = r#"
1218    syntax = "proto3";
1219    package test;
1220    message TestRecord {
1221      int32 id = 1;
1222      Country country = 3;
1223      int64 zipcode = 4;
1224      float rate = 5;
1225    }
1226    message TestRecordAlterType {
1227        string id = 1;
1228        Country country = 3;
1229        int32 zipcode = 4;
1230        float rate = 5;
1231      }
1232    message TestRecordExt {
1233      int32 id = 1;
1234      Country country = 3;
1235      int64 zipcode = 4;
1236      float rate = 5;
1237      string name = 6;
1238    }
1239    message Country {
1240      string address = 1;
1241      City city = 2;
1242      string zipcode = 3;
1243    }
1244    message City {
1245      string address = 1;
1246      string zipcode = 2;
1247    }"#;
1248
1249/// Returns the file.
1250/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1251pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1252    let in_file = Builder::new()
1253        .prefix("temp")
1254        .suffix(".proto")
1255        .rand_bytes(8)
1256        .tempfile()
1257        .unwrap();
1258
1259    let out_file = Builder::new()
1260        .prefix("temp")
1261        .suffix(".pb")
1262        .rand_bytes(8)
1263        .tempfile()
1264        .unwrap();
1265
1266    let mut file = in_file.as_file();
1267    file.write_all(proto_data.as_ref())
1268        .expect("writing binary to test file");
1269    file.flush().expect("flush temp file failed");
1270    let include_path = in_file
1271        .path()
1272        .parent()
1273        .unwrap()
1274        .to_string_lossy()
1275        .into_owned();
1276    let out_path = out_file.path().to_string_lossy().into_owned();
1277    let in_path = in_file.path().to_string_lossy().into_owned();
1278    let mut compile = std::process::Command::new("protoc");
1279
1280    let out = compile
1281        .arg("--include_imports")
1282        .arg("-I")
1283        .arg(include_path)
1284        .arg(format!("--descriptor_set_out={}", out_path))
1285        .arg(in_path)
1286        .output()
1287        .expect("failed to compile proto");
1288    if !out.status.success() {
1289        panic!("compile proto failed \n output: {:?}", out);
1290    }
1291    out_file
1292}