risingwave_frontend/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
29    FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
30    RW_CATALOG_SCHEMA_NAME, TableId,
31};
32use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
33use risingwave_common::session_config::SessionConfig;
34use risingwave_common::system_param::reader::SystemParamsReader;
35use risingwave_common::util::cluster_limit::ClusterLimit;
36use risingwave_common::util::column_index_mapping::ColIndexMapping;
37use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
38use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
39use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
40use risingwave_pb::backup_service::MetaSnapshotMetadata;
41use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
42use risingwave_pb::catalog::{
43    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44    PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::{
49    DdlProgress, PbTableJobType, ReplaceJobPlan, TableJobType, alter_name_request,
50    alter_set_schema_request, alter_swap_rename_request, create_connection_request,
51};
52use risingwave_pb::hummock::write_limits::WriteLimit;
53use risingwave_pb::hummock::{
54    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
55};
56use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
57use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
58use risingwave_pb::meta::list_actor_states_response::ActorState;
59use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
60use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
61use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
62use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
63use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
64use risingwave_pb::meta::{
65    EventLog, PbTableParallelism, PbThrottleTarget, RecoveryStatus, SystemParams,
66};
67use risingwave_pb::stream_plan::StreamFragmentGraph;
68use risingwave_pb::user::update_user_request::UpdateField;
69use risingwave_pb::user::{GrantPrivilege, UserInfo};
70use risingwave_rpc_client::error::Result as RpcResult;
71use tempfile::{Builder, NamedTempFile};
72
73use crate::FrontendOpts;
74use crate::catalog::catalog_service::CatalogWriter;
75use crate::catalog::root_catalog::Catalog;
76use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId};
77use crate::error::{ErrorCode, Result};
78use crate::handler::RwPgResponse;
79use crate::meta_client::FrontendMetaClient;
80use crate::scheduler::HummockSnapshotManagerRef;
81use crate::session::{AuthContext, FrontendEnv, SessionImpl};
82use crate::user::UserId;
83use crate::user::user_manager::UserInfoManager;
84use crate::user::user_service::UserInfoWriter;
85
86/// An embedded frontend without starting meta and without starting frontend as a tcp server.
87pub struct LocalFrontend {
88    pub opts: FrontendOpts,
89    env: FrontendEnv,
90}
91
92impl SessionManager for LocalFrontend {
93    type Session = SessionImpl;
94
95    fn create_dummy_session(
96        &self,
97        _database_id: u32,
98        _user_name: u32,
99    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
100        unreachable!()
101    }
102
103    fn connect(
104        &self,
105        _database: &str,
106        _user_name: &str,
107        _peer_addr: AddressRef,
108    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
109        Ok(self.session_ref())
110    }
111
112    fn cancel_queries_in_session(&self, _session_id: SessionId) {
113        unreachable!()
114    }
115
116    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
117        unreachable!()
118    }
119
120    fn end_session(&self, _session: &Self::Session) {
121        unreachable!()
122    }
123}
124
125impl LocalFrontend {
126    #[expect(clippy::unused_async)]
127    pub async fn new(opts: FrontendOpts) -> Self {
128        let env = FrontendEnv::mock();
129        Self { opts, env }
130    }
131
132    pub async fn run_sql(
133        &self,
134        sql: impl Into<String>,
135    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
136        let sql: Arc<str> = Arc::from(sql.into());
137        self.session_ref().run_statement(sql, vec![]).await
138    }
139
140    pub async fn run_sql_with_session(
141        &self,
142        session_ref: Arc<SessionImpl>,
143        sql: impl Into<String>,
144    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
145        let sql: Arc<str> = Arc::from(sql.into());
146        session_ref.run_statement(sql, vec![]).await
147    }
148
149    pub async fn run_user_sql(
150        &self,
151        sql: impl Into<String>,
152        database: String,
153        user_name: String,
154        user_id: UserId,
155    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
156        let sql: Arc<str> = Arc::from(sql.into());
157        self.session_user_ref(database, user_name, user_id)
158            .run_statement(sql, vec![])
159            .await
160    }
161
162    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
163        let mut rsp = self.run_sql(sql).await.unwrap();
164        let mut res = vec![];
165        #[for_await]
166        for row_set in rsp.values_stream() {
167            for row in row_set.unwrap() {
168                res.push(format!("{:?}", row));
169            }
170        }
171        res
172    }
173
174    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
175        let mut rsp = self.run_sql(sql).await.unwrap();
176        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
177        let mut res = String::new();
178        #[for_await]
179        for row_set in rsp.values_stream() {
180            for row in row_set.unwrap() {
181                let row: Row = row;
182                let row = row.values()[0].as_ref().unwrap();
183                res += std::str::from_utf8(row).unwrap();
184                res += "\n";
185            }
186        }
187        res
188    }
189
190    /// Creates a new session
191    pub fn session_ref(&self) -> Arc<SessionImpl> {
192        self.session_user_ref(
193            DEFAULT_DATABASE_NAME.to_owned(),
194            DEFAULT_SUPER_USER.to_owned(),
195            DEFAULT_SUPER_USER_ID,
196        )
197    }
198
199    pub fn session_user_ref(
200        &self,
201        database: String,
202        user_name: String,
203        user_id: UserId,
204    ) -> Arc<SessionImpl> {
205        Arc::new(SessionImpl::new(
206            self.env.clone(),
207            AuthContext::new(database, user_name, user_id),
208            UserAuthenticator::None,
209            // Local Frontend use a non-sense id.
210            (0, 0),
211            Address::Tcp(SocketAddr::new(
212                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
213                6666,
214            ))
215            .into(),
216            Default::default(),
217        ))
218    }
219}
220
221pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
222    if rsp.stmt_type() != StatementType::EXPLAIN {
223        panic!("RESPONSE INVALID: {rsp:?}");
224    }
225    let mut res = String::new();
226    #[for_await]
227    for row_set in rsp.values_stream() {
228        for row in row_set.unwrap() {
229            let row: Row = row;
230            let row = row.values()[0].as_ref().unwrap();
231            res += std::str::from_utf8(row).unwrap();
232            res += "\n";
233        }
234    }
235    res
236}
237
238pub struct MockCatalogWriter {
239    catalog: Arc<RwLock<Catalog>>,
240    id: AtomicU32,
241    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
242    schema_id_to_database_id: RwLock<HashMap<u32, DatabaseId>>,
243    hummock_snapshot_manager: HummockSnapshotManagerRef,
244}
245
246#[async_trait::async_trait]
247impl CatalogWriter for MockCatalogWriter {
248    async fn create_database(
249        &self,
250        db_name: &str,
251        owner: UserId,
252        resource_group: &str,
253    ) -> Result<()> {
254        let database_id = self.gen_id();
255        self.catalog.write().create_database(&PbDatabase {
256            name: db_name.to_owned(),
257            id: database_id,
258            owner,
259            resource_group: resource_group.to_owned(),
260        });
261        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
262            .await?;
263        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
264            .await?;
265        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
266            .await?;
267        Ok(())
268    }
269
270    async fn create_schema(
271        &self,
272        db_id: DatabaseId,
273        schema_name: &str,
274        owner: UserId,
275    ) -> Result<()> {
276        let id = self.gen_id();
277        self.catalog.write().create_schema(&PbSchema {
278            id,
279            name: schema_name.to_owned(),
280            database_id: db_id,
281            owner,
282        });
283        self.add_schema_id(id, db_id);
284        Ok(())
285    }
286
287    async fn create_materialized_view(
288        &self,
289        mut table: PbTable,
290        _graph: StreamFragmentGraph,
291        _dependencies: HashSet<ObjectId>,
292        _specific_resource_group: Option<String>,
293    ) -> Result<()> {
294        table.id = self.gen_id();
295        table.stream_job_status = PbStreamJobStatus::Created as _;
296        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
297        self.catalog.write().create_table(&table);
298        self.add_table_or_source_id(table.id, table.schema_id, table.database_id);
299        self.hummock_snapshot_manager
300            .add_table_for_test(TableId::new(table.id));
301        Ok(())
302    }
303
304    async fn create_view(&self, mut view: PbView) -> Result<()> {
305        view.id = self.gen_id();
306        self.catalog.write().create_view(&view);
307        self.add_table_or_source_id(view.id, view.schema_id, view.database_id);
308        Ok(())
309    }
310
311    async fn create_table(
312        &self,
313        source: Option<PbSource>,
314        mut table: PbTable,
315        graph: StreamFragmentGraph,
316        _job_type: PbTableJobType,
317    ) -> Result<()> {
318        if let Some(source) = source {
319            let source_id = self.create_source_inner(source)?;
320            table.optional_associated_source_id =
321                Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
322        }
323        self.create_materialized_view(table, graph, HashSet::new(), None)
324            .await?;
325        Ok(())
326    }
327
328    async fn replace_table(
329        &self,
330        _source: Option<PbSource>,
331        mut table: PbTable,
332        _graph: StreamFragmentGraph,
333        _mapping: ColIndexMapping,
334        _job_type: TableJobType,
335    ) -> Result<()> {
336        table.stream_job_status = PbStreamJobStatus::Created as _;
337        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
338        self.catalog.write().update_table(&table);
339        Ok(())
340    }
341
342    async fn replace_source(
343        &self,
344        source: PbSource,
345        _graph: StreamFragmentGraph,
346        _mapping: ColIndexMapping,
347    ) -> Result<()> {
348        self.catalog.write().update_source(&source);
349        Ok(())
350    }
351
352    async fn create_source(
353        &self,
354        source: PbSource,
355        _graph: Option<StreamFragmentGraph>,
356    ) -> Result<()> {
357        self.create_source_inner(source).map(|_| ())
358    }
359
360    async fn create_sink(
361        &self,
362        sink: PbSink,
363        graph: StreamFragmentGraph,
364        _affected_table_change: Option<ReplaceJobPlan>,
365        _dependencies: HashSet<ObjectId>,
366    ) -> Result<()> {
367        self.create_sink_inner(sink, graph)
368    }
369
370    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
371        self.create_subscription_inner(subscription)
372    }
373
374    async fn create_index(
375        &self,
376        mut index: PbIndex,
377        mut index_table: PbTable,
378        _graph: StreamFragmentGraph,
379    ) -> Result<()> {
380        index_table.id = self.gen_id();
381        index_table.stream_job_status = PbStreamJobStatus::Created as _;
382        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
383        self.catalog.write().create_table(&index_table);
384        self.add_table_or_index_id(
385            index_table.id,
386            index_table.schema_id,
387            index_table.database_id,
388        );
389
390        index.id = index_table.id;
391        index.index_table_id = index_table.id;
392        self.catalog.write().create_index(&index);
393        Ok(())
394    }
395
396    async fn create_function(&self, _function: PbFunction) -> Result<()> {
397        unreachable!()
398    }
399
400    async fn create_connection(
401        &self,
402        _connection_name: String,
403        _database_id: u32,
404        _schema_id: u32,
405        _owner_id: u32,
406        _connection: create_connection_request::Payload,
407    ) -> Result<()> {
408        unreachable!()
409    }
410
411    async fn create_secret(
412        &self,
413        _secret_name: String,
414        _database_id: u32,
415        _schema_id: u32,
416        _owner_id: u32,
417        _payload: Vec<u8>,
418    ) -> Result<()> {
419        unreachable!()
420    }
421
422    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
423        unreachable!()
424    }
425
426    async fn drop_table(
427        &self,
428        source_id: Option<u32>,
429        table_id: TableId,
430        cascade: bool,
431    ) -> Result<()> {
432        if cascade {
433            return Err(ErrorCode::NotSupported(
434                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
435                "use drop instead".to_owned(),
436            )
437            .into());
438        }
439        if let Some(source_id) = source_id {
440            self.drop_table_or_source_id(source_id);
441        }
442        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
443        let indexes =
444            self.catalog
445                .read()
446                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
447        for index in indexes {
448            self.drop_index(index.id, cascade).await?;
449        }
450        self.catalog
451            .write()
452            .drop_table(database_id, schema_id, table_id);
453        if let Some(source_id) = source_id {
454            self.catalog
455                .write()
456                .drop_source(database_id, schema_id, source_id);
457        }
458        Ok(())
459    }
460
461    async fn drop_view(&self, _view_id: u32, _cascade: bool) -> Result<()> {
462        unreachable!()
463    }
464
465    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
466        if cascade {
467            return Err(ErrorCode::NotSupported(
468                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
469                "use drop instead".to_owned(),
470            )
471            .into());
472        }
473        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
474        let indexes =
475            self.catalog
476                .read()
477                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
478        for index in indexes {
479            self.drop_index(index.id, cascade).await?;
480        }
481        self.catalog
482            .write()
483            .drop_table(database_id, schema_id, table_id);
484        Ok(())
485    }
486
487    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
488        if cascade {
489            return Err(ErrorCode::NotSupported(
490                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
491                "use drop instead".to_owned(),
492            )
493            .into());
494        }
495        let (database_id, schema_id) = self.drop_table_or_source_id(source_id);
496        self.catalog
497            .write()
498            .drop_source(database_id, schema_id, source_id);
499        Ok(())
500    }
501
502    async fn drop_sink(
503        &self,
504        sink_id: u32,
505        cascade: bool,
506        _target_table_change: Option<ReplaceJobPlan>,
507    ) -> Result<()> {
508        if cascade {
509            return Err(ErrorCode::NotSupported(
510                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
511                "use drop instead".to_owned(),
512            )
513            .into());
514        }
515        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id);
516        self.catalog
517            .write()
518            .drop_sink(database_id, schema_id, sink_id);
519        Ok(())
520    }
521
522    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
523        if cascade {
524            return Err(ErrorCode::NotSupported(
525                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
526                "use drop instead".to_owned(),
527            )
528            .into());
529        }
530        let (database_id, schema_id) = self.drop_table_or_subscription_id(subscription_id);
531        self.catalog
532            .write()
533            .drop_subscription(database_id, schema_id, subscription_id);
534        Ok(())
535    }
536
537    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
538        if cascade {
539            return Err(ErrorCode::NotSupported(
540                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
541                "use drop instead".to_owned(),
542            )
543            .into());
544        }
545        let &schema_id = self
546            .table_id_to_schema_id
547            .read()
548            .get(&index_id.index_id)
549            .unwrap();
550        let database_id = self.get_database_id_by_schema(schema_id);
551
552        let index = {
553            let catalog_reader = self.catalog.read();
554            let schema_catalog = catalog_reader
555                .get_schema_by_id(&database_id, &schema_id)
556                .unwrap();
557            schema_catalog.get_index_by_id(&index_id).unwrap().clone()
558        };
559
560        let index_table_id = index.index_table.id;
561        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.index_id);
562        self.catalog
563            .write()
564            .drop_index(database_id, schema_id, index_id);
565        self.catalog
566            .write()
567            .drop_table(database_id, schema_id, index_table_id);
568        Ok(())
569    }
570
571    async fn drop_function(&self, _function_id: FunctionId) -> Result<()> {
572        unreachable!()
573    }
574
575    async fn drop_connection(&self, _connection_id: ConnectionId) -> Result<()> {
576        unreachable!()
577    }
578
579    async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
580        unreachable!()
581    }
582
583    async fn drop_database(&self, database_id: u32) -> Result<()> {
584        self.catalog.write().drop_database(database_id);
585        Ok(())
586    }
587
588    async fn drop_schema(&self, schema_id: u32, _cascade: bool) -> Result<()> {
589        let database_id = self.drop_schema_id(schema_id);
590        self.catalog.write().drop_schema(database_id, schema_id);
591        Ok(())
592    }
593
594    async fn alter_name(
595        &self,
596        object_id: alter_name_request::Object,
597        object_name: &str,
598    ) -> Result<()> {
599        match object_id {
600            alter_name_request::Object::TableId(table_id) => {
601                self.catalog
602                    .write()
603                    .alter_table_name_by_id(&table_id.into(), object_name);
604                Ok(())
605            }
606            _ => {
607                unimplemented!()
608            }
609        }
610    }
611
612    async fn alter_source(&self, source: PbSource) -> Result<()> {
613        self.catalog.write().update_source(&source);
614        Ok(())
615    }
616
617    async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
618        for database in self.catalog.read().iter_databases() {
619            for schema in database.iter_schemas() {
620                match object {
621                    Object::TableId(table_id) => {
622                        if let Some(table) =
623                            schema.get_created_table_by_id(&TableId::from(table_id))
624                        {
625                            let mut pb_table = table.to_prost();
626                            pb_table.owner = owner_id;
627                            self.catalog.write().update_table(&pb_table);
628                            return Ok(());
629                        }
630                    }
631                    _ => unreachable!(),
632                }
633            }
634        }
635
636        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
637    }
638
639    async fn alter_set_schema(
640        &self,
641        object: alter_set_schema_request::Object,
642        new_schema_id: u32,
643    ) -> Result<()> {
644        match object {
645            alter_set_schema_request::Object::TableId(table_id) => {
646                let mut pb_table = {
647                    let reader = self.catalog.read();
648                    let table = reader.get_any_table_by_id(&table_id.into())?.to_owned();
649                    table.to_prost()
650                };
651                pb_table.schema_id = new_schema_id;
652                self.catalog.write().update_table(&pb_table);
653                self.table_id_to_schema_id
654                    .write()
655                    .insert(table_id, new_schema_id);
656                Ok(())
657            }
658            _ => unreachable!(),
659        }
660    }
661
662    async fn alter_parallelism(
663        &self,
664        _table_id: u32,
665        _parallelism: PbTableParallelism,
666        _deferred: bool,
667    ) -> Result<()> {
668        todo!()
669    }
670
671    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
672        todo!()
673    }
674
675    async fn alter_secret(
676        &self,
677        _secret_id: u32,
678        _secret_name: String,
679        _database_id: u32,
680        _schema_id: u32,
681        _owner_id: u32,
682        _payload: Vec<u8>,
683    ) -> Result<()> {
684        unreachable!()
685    }
686
687    async fn alter_resource_group(
688        &self,
689        _table_id: u32,
690        _resource_group: Option<String>,
691        _deferred: bool,
692    ) -> Result<()> {
693        todo!()
694    }
695}
696
697impl MockCatalogWriter {
698    pub fn new(
699        catalog: Arc<RwLock<Catalog>>,
700        hummock_snapshot_manager: HummockSnapshotManagerRef,
701    ) -> Self {
702        catalog.write().create_database(&PbDatabase {
703            id: 0,
704            name: DEFAULT_DATABASE_NAME.to_owned(),
705            owner: DEFAULT_SUPER_USER_ID,
706            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
707        });
708        catalog.write().create_schema(&PbSchema {
709            id: 1,
710            name: DEFAULT_SCHEMA_NAME.to_owned(),
711            database_id: 0,
712            owner: DEFAULT_SUPER_USER_ID,
713        });
714        catalog.write().create_schema(&PbSchema {
715            id: 2,
716            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
717            database_id: 0,
718            owner: DEFAULT_SUPER_USER_ID,
719        });
720        catalog.write().create_schema(&PbSchema {
721            id: 3,
722            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
723            database_id: 0,
724            owner: DEFAULT_SUPER_USER_ID,
725        });
726        let mut map: HashMap<u32, DatabaseId> = HashMap::new();
727        map.insert(1_u32, 0_u32);
728        map.insert(2_u32, 0_u32);
729        map.insert(3_u32, 0_u32);
730        Self {
731            catalog,
732            id: AtomicU32::new(3),
733            table_id_to_schema_id: Default::default(),
734            schema_id_to_database_id: RwLock::new(map),
735            hummock_snapshot_manager,
736        }
737    }
738
739    fn gen_id(&self) -> u32 {
740        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
741        self.id.fetch_add(1, Ordering::SeqCst) + 1
742    }
743
744    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
745        self.table_id_to_schema_id
746            .write()
747            .insert(table_id, schema_id);
748    }
749
750    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
751        let schema_id = self
752            .table_id_to_schema_id
753            .write()
754            .remove(&table_id)
755            .unwrap();
756        (self.get_database_id_by_schema(schema_id), schema_id)
757    }
758
759    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
760        self.table_id_to_schema_id
761            .write()
762            .insert(table_id, schema_id);
763    }
764
765    fn add_table_or_subscription_id(
766        &self,
767        table_id: u32,
768        schema_id: SchemaId,
769        _database_id: DatabaseId,
770    ) {
771        self.table_id_to_schema_id
772            .write()
773            .insert(table_id, schema_id);
774    }
775
776    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
777        self.table_id_to_schema_id
778            .write()
779            .insert(table_id, schema_id);
780    }
781
782    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
783        let schema_id = self
784            .table_id_to_schema_id
785            .write()
786            .remove(&table_id)
787            .unwrap();
788        (self.get_database_id_by_schema(schema_id), schema_id)
789    }
790
791    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
792        let schema_id = self
793            .table_id_to_schema_id
794            .write()
795            .remove(&table_id)
796            .unwrap();
797        (self.get_database_id_by_schema(schema_id), schema_id)
798    }
799
800    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
801        let schema_id = self
802            .table_id_to_schema_id
803            .write()
804            .remove(&table_id)
805            .unwrap();
806        (self.get_database_id_by_schema(schema_id), schema_id)
807    }
808
809    fn add_schema_id(&self, schema_id: u32, database_id: DatabaseId) {
810        self.schema_id_to_database_id
811            .write()
812            .insert(schema_id, database_id);
813    }
814
815    fn drop_schema_id(&self, schema_id: u32) -> DatabaseId {
816        self.schema_id_to_database_id
817            .write()
818            .remove(&schema_id)
819            .unwrap()
820    }
821
822    fn create_source_inner(&self, mut source: PbSource) -> Result<u32> {
823        source.id = self.gen_id();
824        self.catalog.write().create_source(&source);
825        self.add_table_or_source_id(source.id, source.schema_id, source.database_id);
826        Ok(source.id)
827    }
828
829    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
830        sink.id = self.gen_id();
831        self.catalog.write().create_sink(&sink);
832        self.add_table_or_sink_id(sink.id, sink.schema_id, sink.database_id);
833        Ok(())
834    }
835
836    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
837        subscription.id = self.gen_id();
838        self.catalog.write().create_subscription(&subscription);
839        self.add_table_or_subscription_id(
840            subscription.id,
841            subscription.schema_id,
842            subscription.database_id,
843        );
844        Ok(())
845    }
846
847    fn get_database_id_by_schema(&self, schema_id: u32) -> DatabaseId {
848        *self
849            .schema_id_to_database_id
850            .read()
851            .get(&schema_id)
852            .unwrap()
853    }
854}
855
856pub struct MockUserInfoWriter {
857    id: AtomicU32,
858    user_info: Arc<RwLock<UserInfoManager>>,
859}
860
861#[async_trait::async_trait]
862impl UserInfoWriter for MockUserInfoWriter {
863    async fn create_user(&self, user: UserInfo) -> Result<()> {
864        let mut user = user;
865        user.id = self.gen_id();
866        self.user_info.write().create_user(user);
867        Ok(())
868    }
869
870    async fn drop_user(&self, id: UserId) -> Result<()> {
871        self.user_info.write().drop_user(id);
872        Ok(())
873    }
874
875    async fn update_user(
876        &self,
877        update_user: UserInfo,
878        update_fields: Vec<UpdateField>,
879    ) -> Result<()> {
880        let mut lock = self.user_info.write();
881        let id = update_user.get_id();
882        let Some(old_name) = lock.get_user_name_by_id(id) else {
883            return Ok(());
884        };
885        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
886        update_fields.into_iter().for_each(|field| match field {
887            UpdateField::Super => user_info.is_super = update_user.is_super,
888            UpdateField::Login => user_info.can_login = update_user.can_login,
889            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
890            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
891            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
892            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
893            UpdateField::Unspecified => unreachable!(),
894        });
895        lock.update_user(update_user);
896        Ok(())
897    }
898
899    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
900    /// `GrantAllSources` when grant privilege to user.
901    async fn grant_privilege(
902        &self,
903        users: Vec<UserId>,
904        privileges: Vec<GrantPrivilege>,
905        with_grant_option: bool,
906        _grantor: UserId,
907    ) -> Result<()> {
908        let privileges = privileges
909            .into_iter()
910            .map(|mut p| {
911                p.action_with_opts
912                    .iter_mut()
913                    .for_each(|ao| ao.with_grant_option = with_grant_option);
914                p
915            })
916            .collect::<Vec<_>>();
917        for user_id in users {
918            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
919                u.extend_privileges(privileges.clone());
920            }
921        }
922        Ok(())
923    }
924
925    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
926    /// `RevokeAllSources` when revoke privilege from user.
927    async fn revoke_privilege(
928        &self,
929        users: Vec<UserId>,
930        privileges: Vec<GrantPrivilege>,
931        _granted_by: UserId,
932        _revoke_by: UserId,
933        revoke_grant_option: bool,
934        _cascade: bool,
935    ) -> Result<()> {
936        for user_id in users {
937            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
938                u.revoke_privileges(privileges.clone(), revoke_grant_option);
939            }
940        }
941        Ok(())
942    }
943}
944
945impl MockUserInfoWriter {
946    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
947        user_info.write().create_user(UserInfo {
948            id: DEFAULT_SUPER_USER_ID,
949            name: DEFAULT_SUPER_USER.to_owned(),
950            is_super: true,
951            can_create_db: true,
952            can_create_user: true,
953            can_login: true,
954            ..Default::default()
955        });
956        Self {
957            user_info,
958            id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
959        }
960    }
961
962    fn gen_id(&self) -> u32 {
963        self.id.fetch_add(1, Ordering::SeqCst)
964    }
965}
966
967pub struct MockFrontendMetaClient {}
968
969#[async_trait::async_trait]
970impl FrontendMetaClient for MockFrontendMetaClient {
971    async fn try_unregister(&self) {}
972
973    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
974        Ok(INVALID_VERSION_ID)
975    }
976
977    async fn wait(&self) -> RpcResult<()> {
978        Ok(())
979    }
980
981    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
982        Ok(vec![])
983    }
984
985    async fn list_table_fragments(
986        &self,
987        _table_ids: &[u32],
988    ) -> RpcResult<HashMap<u32, TableFragmentInfo>> {
989        Ok(HashMap::default())
990    }
991
992    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
993        Ok(vec![])
994    }
995
996    async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
997        Ok(vec![])
998    }
999
1000    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1001        Ok(vec![])
1002    }
1003
1004    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1005        Ok(vec![])
1006    }
1007
1008    async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1009        Ok(vec![])
1010    }
1011
1012    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1013        Ok(vec![])
1014    }
1015
1016    async fn get_system_params(&self) -> RpcResult<SystemParamsReader> {
1017        Ok(SystemParams::default().into())
1018    }
1019
1020    async fn set_system_param(
1021        &self,
1022        _param: String,
1023        _value: Option<String>,
1024    ) -> RpcResult<Option<SystemParamsReader>> {
1025        Ok(Some(SystemParams::default().into()))
1026    }
1027
1028    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1029        Ok(Default::default())
1030    }
1031
1032    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1033        Ok("".to_owned())
1034    }
1035
1036    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1037        Ok(vec![])
1038    }
1039
1040    async fn get_tables(
1041        &self,
1042        _table_ids: &[u32],
1043        _include_dropped_tables: bool,
1044    ) -> RpcResult<HashMap<u32, Table>> {
1045        Ok(HashMap::new())
1046    }
1047
1048    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(u32, u64)>> {
1049        unimplemented!()
1050    }
1051
1052    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1053        Ok(HummockVersion::default())
1054    }
1055
1056    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1057        unimplemented!()
1058    }
1059
1060    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1061        unimplemented!()
1062    }
1063
1064    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1065        unimplemented!()
1066    }
1067
1068    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1069        unimplemented!()
1070    }
1071
1072    async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1073        unimplemented!()
1074    }
1075
1076    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1077        unimplemented!()
1078    }
1079
1080    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1081        unimplemented!()
1082    }
1083
1084    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1085        unimplemented!()
1086    }
1087
1088    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1089        Ok(vec![])
1090    }
1091
1092    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1093        unimplemented!()
1094    }
1095
1096    async fn recover(&self) -> RpcResult<()> {
1097        unimplemented!()
1098    }
1099
1100    async fn apply_throttle(
1101        &self,
1102        _kind: PbThrottleTarget,
1103        _id: u32,
1104        _rate_limit: Option<u32>,
1105    ) -> RpcResult<()> {
1106        unimplemented!()
1107    }
1108
1109    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1110        Ok(RecoveryStatus::StatusRunning)
1111    }
1112
1113    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1114        Ok(vec![])
1115    }
1116
1117    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1118        Ok(vec![])
1119    }
1120
1121    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1122        unimplemented!()
1123    }
1124}
1125
1126#[cfg(test)]
1127pub static PROTO_FILE_DATA: &str = r#"
1128    syntax = "proto3";
1129    package test;
1130    message TestRecord {
1131      int32 id = 1;
1132      Country country = 3;
1133      int64 zipcode = 4;
1134      float rate = 5;
1135    }
1136    message TestRecordAlterType {
1137        string id = 1;
1138        Country country = 3;
1139        int32 zipcode = 4;
1140        float rate = 5;
1141      }
1142    message TestRecordExt {
1143      int32 id = 1;
1144      Country country = 3;
1145      int64 zipcode = 4;
1146      float rate = 5;
1147      string name = 6;
1148    }
1149    message Country {
1150      string address = 1;
1151      City city = 2;
1152      string zipcode = 3;
1153    }
1154    message City {
1155      string address = 1;
1156      string zipcode = 2;
1157    }"#;
1158
1159/// Returns the file.
1160/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1161pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1162    let in_file = Builder::new()
1163        .prefix("temp")
1164        .suffix(".proto")
1165        .rand_bytes(8)
1166        .tempfile()
1167        .unwrap();
1168
1169    let out_file = Builder::new()
1170        .prefix("temp")
1171        .suffix(".pb")
1172        .rand_bytes(8)
1173        .tempfile()
1174        .unwrap();
1175
1176    let mut file = in_file.as_file();
1177    file.write_all(proto_data.as_ref())
1178        .expect("writing binary to test file");
1179    file.flush().expect("flush temp file failed");
1180    let include_path = in_file
1181        .path()
1182        .parent()
1183        .unwrap()
1184        .to_string_lossy()
1185        .into_owned();
1186    let out_path = out_file.path().to_string_lossy().into_owned();
1187    let in_path = in_file.path().to_string_lossy().into_owned();
1188    let mut compile = std::process::Command::new("protoc");
1189
1190    let out = compile
1191        .arg("--include_imports")
1192        .arg("-I")
1193        .arg(include_path)
1194        .arg(format!("--descriptor_set_out={}", out_path))
1195        .arg(in_path)
1196        .output()
1197        .expect("failed to compile proto");
1198    if !out.status.success() {
1199        panic!("compile proto failed \n output: {:?}", out);
1200    }
1201    out_file
1202}