risingwave_frontend/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
29    FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
30    RW_CATALOG_SCHEMA_NAME, TableId,
31};
32use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
33use risingwave_common::session_config::SessionConfig;
34use risingwave_common::system_param::reader::SystemParamsReader;
35use risingwave_common::util::cluster_limit::ClusterLimit;
36use risingwave_common::util::column_index_mapping::ColIndexMapping;
37use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
38use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
39use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
40use risingwave_pb::backup_service::MetaSnapshotMetadata;
41use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
42use risingwave_pb::catalog::{
43    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44    PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::{
49    DdlProgress, PbTableJobType, ReplaceJobPlan, TableJobType, alter_name_request,
50    alter_set_schema_request, alter_swap_rename_request, create_connection_request,
51};
52use risingwave_pb::hummock::write_limits::WriteLimit;
53use risingwave_pb::hummock::{
54    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
55};
56use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
57use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
58use risingwave_pb::meta::list_actor_states_response::ActorState;
59use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
60use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
61use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
62use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
63use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
64use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
65use risingwave_pb::meta::{
66    EventLog, PbTableParallelism, PbThrottleTarget, RecoveryStatus, SystemParams,
67};
68use risingwave_pb::secret::PbSecretRef;
69use risingwave_pb::stream_plan::StreamFragmentGraph;
70use risingwave_pb::user::update_user_request::UpdateField;
71use risingwave_pb::user::{GrantPrivilege, UserInfo};
72use risingwave_rpc_client::error::Result as RpcResult;
73use tempfile::{Builder, NamedTempFile};
74
75use crate::FrontendOpts;
76use crate::catalog::catalog_service::CatalogWriter;
77use crate::catalog::root_catalog::Catalog;
78use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId};
79use crate::error::{ErrorCode, Result};
80use crate::handler::RwPgResponse;
81use crate::meta_client::FrontendMetaClient;
82use crate::scheduler::HummockSnapshotManagerRef;
83use crate::session::{AuthContext, FrontendEnv, SessionImpl};
84use crate::user::UserId;
85use crate::user::user_manager::UserInfoManager;
86use crate::user::user_service::UserInfoWriter;
87
88/// An embedded frontend without starting meta and without starting frontend as a tcp server.
89pub struct LocalFrontend {
90    pub opts: FrontendOpts,
91    env: FrontendEnv,
92}
93
94impl SessionManager for LocalFrontend {
95    type Session = SessionImpl;
96
97    fn create_dummy_session(
98        &self,
99        _database_id: u32,
100        _user_name: u32,
101    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
102        unreachable!()
103    }
104
105    fn connect(
106        &self,
107        _database: &str,
108        _user_name: &str,
109        _peer_addr: AddressRef,
110    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
111        Ok(self.session_ref())
112    }
113
114    fn cancel_queries_in_session(&self, _session_id: SessionId) {
115        unreachable!()
116    }
117
118    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
119        unreachable!()
120    }
121
122    fn end_session(&self, _session: &Self::Session) {
123        unreachable!()
124    }
125}
126
127impl LocalFrontend {
128    #[expect(clippy::unused_async)]
129    pub async fn new(opts: FrontendOpts) -> Self {
130        let env = FrontendEnv::mock();
131        Self { opts, env }
132    }
133
134    pub async fn run_sql(
135        &self,
136        sql: impl Into<String>,
137    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
138        let sql: Arc<str> = Arc::from(sql.into());
139        self.session_ref().run_statement(sql, vec![]).await
140    }
141
142    pub async fn run_sql_with_session(
143        &self,
144        session_ref: Arc<SessionImpl>,
145        sql: impl Into<String>,
146    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
147        let sql: Arc<str> = Arc::from(sql.into());
148        session_ref.run_statement(sql, vec![]).await
149    }
150
151    pub async fn run_user_sql(
152        &self,
153        sql: impl Into<String>,
154        database: String,
155        user_name: String,
156        user_id: UserId,
157    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
158        let sql: Arc<str> = Arc::from(sql.into());
159        self.session_user_ref(database, user_name, user_id)
160            .run_statement(sql, vec![])
161            .await
162    }
163
164    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
165        let mut rsp = self.run_sql(sql).await.unwrap();
166        let mut res = vec![];
167        #[for_await]
168        for row_set in rsp.values_stream() {
169            for row in row_set.unwrap() {
170                res.push(format!("{:?}", row));
171            }
172        }
173        res
174    }
175
176    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
177        let mut rsp = self.run_sql(sql).await.unwrap();
178        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
179        let mut res = String::new();
180        #[for_await]
181        for row_set in rsp.values_stream() {
182            for row in row_set.unwrap() {
183                let row: Row = row;
184                let row = row.values()[0].as_ref().unwrap();
185                res += std::str::from_utf8(row).unwrap();
186                res += "\n";
187            }
188        }
189        res
190    }
191
192    /// Creates a new session
193    pub fn session_ref(&self) -> Arc<SessionImpl> {
194        self.session_user_ref(
195            DEFAULT_DATABASE_NAME.to_owned(),
196            DEFAULT_SUPER_USER.to_owned(),
197            DEFAULT_SUPER_USER_ID,
198        )
199    }
200
201    pub fn session_user_ref(
202        &self,
203        database: String,
204        user_name: String,
205        user_id: UserId,
206    ) -> Arc<SessionImpl> {
207        Arc::new(SessionImpl::new(
208            self.env.clone(),
209            AuthContext::new(database, user_name, user_id),
210            UserAuthenticator::None,
211            // Local Frontend use a non-sense id.
212            (0, 0),
213            Address::Tcp(SocketAddr::new(
214                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
215                6666,
216            ))
217            .into(),
218            Default::default(),
219        ))
220    }
221}
222
223pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
224    if rsp.stmt_type() != StatementType::EXPLAIN {
225        panic!("RESPONSE INVALID: {rsp:?}");
226    }
227    let mut res = String::new();
228    #[for_await]
229    for row_set in rsp.values_stream() {
230        for row in row_set.unwrap() {
231            let row: Row = row;
232            let row = row.values()[0].as_ref().unwrap();
233            res += std::str::from_utf8(row).unwrap();
234            res += "\n";
235        }
236    }
237    res
238}
239
240pub struct MockCatalogWriter {
241    catalog: Arc<RwLock<Catalog>>,
242    id: AtomicU32,
243    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
244    schema_id_to_database_id: RwLock<HashMap<u32, DatabaseId>>,
245    hummock_snapshot_manager: HummockSnapshotManagerRef,
246}
247
248#[async_trait::async_trait]
249impl CatalogWriter for MockCatalogWriter {
250    async fn create_database(
251        &self,
252        db_name: &str,
253        owner: UserId,
254        resource_group: &str,
255    ) -> Result<()> {
256        let database_id = self.gen_id();
257        self.catalog.write().create_database(&PbDatabase {
258            name: db_name.to_owned(),
259            id: database_id,
260            owner,
261            resource_group: resource_group.to_owned(),
262        });
263        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
264            .await?;
265        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
266            .await?;
267        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
268            .await?;
269        Ok(())
270    }
271
272    async fn create_schema(
273        &self,
274        db_id: DatabaseId,
275        schema_name: &str,
276        owner: UserId,
277    ) -> Result<()> {
278        let id = self.gen_id();
279        self.catalog.write().create_schema(&PbSchema {
280            id,
281            name: schema_name.to_owned(),
282            database_id: db_id,
283            owner,
284        });
285        self.add_schema_id(id, db_id);
286        Ok(())
287    }
288
289    async fn create_materialized_view(
290        &self,
291        mut table: PbTable,
292        _graph: StreamFragmentGraph,
293        _dependencies: HashSet<ObjectId>,
294        _specific_resource_group: Option<String>,
295    ) -> Result<()> {
296        table.id = self.gen_id();
297        table.stream_job_status = PbStreamJobStatus::Created as _;
298        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
299        self.catalog.write().create_table(&table);
300        self.add_table_or_source_id(table.id, table.schema_id, table.database_id);
301        self.hummock_snapshot_manager
302            .add_table_for_test(TableId::new(table.id));
303        Ok(())
304    }
305
306    async fn create_view(&self, mut view: PbView) -> Result<()> {
307        view.id = self.gen_id();
308        self.catalog.write().create_view(&view);
309        self.add_table_or_source_id(view.id, view.schema_id, view.database_id);
310        Ok(())
311    }
312
313    async fn create_table(
314        &self,
315        source: Option<PbSource>,
316        mut table: PbTable,
317        graph: StreamFragmentGraph,
318        _job_type: PbTableJobType,
319    ) -> Result<()> {
320        if let Some(source) = source {
321            let source_id = self.create_source_inner(source)?;
322            table.optional_associated_source_id =
323                Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
324        }
325        self.create_materialized_view(table, graph, HashSet::new(), None)
326            .await?;
327        Ok(())
328    }
329
330    async fn replace_table(
331        &self,
332        _source: Option<PbSource>,
333        mut table: PbTable,
334        _graph: StreamFragmentGraph,
335        _mapping: ColIndexMapping,
336        _job_type: TableJobType,
337    ) -> Result<()> {
338        table.stream_job_status = PbStreamJobStatus::Created as _;
339        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
340        self.catalog.write().update_table(&table);
341        Ok(())
342    }
343
344    async fn replace_source(
345        &self,
346        source: PbSource,
347        _graph: StreamFragmentGraph,
348        _mapping: ColIndexMapping,
349    ) -> Result<()> {
350        self.catalog.write().update_source(&source);
351        Ok(())
352    }
353
354    async fn create_source(
355        &self,
356        source: PbSource,
357        _graph: Option<StreamFragmentGraph>,
358    ) -> Result<()> {
359        self.create_source_inner(source).map(|_| ())
360    }
361
362    async fn create_sink(
363        &self,
364        sink: PbSink,
365        graph: StreamFragmentGraph,
366        _affected_table_change: Option<ReplaceJobPlan>,
367        _dependencies: HashSet<ObjectId>,
368    ) -> Result<()> {
369        self.create_sink_inner(sink, graph)
370    }
371
372    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
373        self.create_subscription_inner(subscription)
374    }
375
376    async fn create_index(
377        &self,
378        mut index: PbIndex,
379        mut index_table: PbTable,
380        _graph: StreamFragmentGraph,
381    ) -> Result<()> {
382        index_table.id = self.gen_id();
383        index_table.stream_job_status = PbStreamJobStatus::Created as _;
384        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
385        self.catalog.write().create_table(&index_table);
386        self.add_table_or_index_id(
387            index_table.id,
388            index_table.schema_id,
389            index_table.database_id,
390        );
391
392        index.id = index_table.id;
393        index.index_table_id = index_table.id;
394        self.catalog.write().create_index(&index);
395        Ok(())
396    }
397
398    async fn create_function(&self, _function: PbFunction) -> Result<()> {
399        unreachable!()
400    }
401
402    async fn create_connection(
403        &self,
404        _connection_name: String,
405        _database_id: u32,
406        _schema_id: u32,
407        _owner_id: u32,
408        _connection: create_connection_request::Payload,
409    ) -> Result<()> {
410        unreachable!()
411    }
412
413    async fn create_secret(
414        &self,
415        _secret_name: String,
416        _database_id: u32,
417        _schema_id: u32,
418        _owner_id: u32,
419        _payload: Vec<u8>,
420    ) -> Result<()> {
421        unreachable!()
422    }
423
424    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
425        unreachable!()
426    }
427
428    async fn drop_table(
429        &self,
430        source_id: Option<u32>,
431        table_id: TableId,
432        cascade: bool,
433    ) -> Result<()> {
434        if cascade {
435            return Err(ErrorCode::NotSupported(
436                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
437                "use drop instead".to_owned(),
438            )
439            .into());
440        }
441        if let Some(source_id) = source_id {
442            self.drop_table_or_source_id(source_id);
443        }
444        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
445        let indexes =
446            self.catalog
447                .read()
448                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
449        for index in indexes {
450            self.drop_index(index.id, cascade).await?;
451        }
452        self.catalog
453            .write()
454            .drop_table(database_id, schema_id, table_id);
455        if let Some(source_id) = source_id {
456            self.catalog
457                .write()
458                .drop_source(database_id, schema_id, source_id);
459        }
460        Ok(())
461    }
462
463    async fn drop_view(&self, _view_id: u32, _cascade: bool) -> Result<()> {
464        unreachable!()
465    }
466
467    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
468        if cascade {
469            return Err(ErrorCode::NotSupported(
470                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
471                "use drop instead".to_owned(),
472            )
473            .into());
474        }
475        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
476        let indexes =
477            self.catalog
478                .read()
479                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
480        for index in indexes {
481            self.drop_index(index.id, cascade).await?;
482        }
483        self.catalog
484            .write()
485            .drop_table(database_id, schema_id, table_id);
486        Ok(())
487    }
488
489    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
490        if cascade {
491            return Err(ErrorCode::NotSupported(
492                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
493                "use drop instead".to_owned(),
494            )
495            .into());
496        }
497        let (database_id, schema_id) = self.drop_table_or_source_id(source_id);
498        self.catalog
499            .write()
500            .drop_source(database_id, schema_id, source_id);
501        Ok(())
502    }
503
504    async fn drop_sink(
505        &self,
506        sink_id: u32,
507        cascade: bool,
508        _target_table_change: Option<ReplaceJobPlan>,
509    ) -> Result<()> {
510        if cascade {
511            return Err(ErrorCode::NotSupported(
512                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
513                "use drop instead".to_owned(),
514            )
515            .into());
516        }
517        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id);
518        self.catalog
519            .write()
520            .drop_sink(database_id, schema_id, sink_id);
521        Ok(())
522    }
523
524    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
525        if cascade {
526            return Err(ErrorCode::NotSupported(
527                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
528                "use drop instead".to_owned(),
529            )
530            .into());
531        }
532        let (database_id, schema_id) = self.drop_table_or_subscription_id(subscription_id);
533        self.catalog
534            .write()
535            .drop_subscription(database_id, schema_id, subscription_id);
536        Ok(())
537    }
538
539    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
540        if cascade {
541            return Err(ErrorCode::NotSupported(
542                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
543                "use drop instead".to_owned(),
544            )
545            .into());
546        }
547        let &schema_id = self
548            .table_id_to_schema_id
549            .read()
550            .get(&index_id.index_id)
551            .unwrap();
552        let database_id = self.get_database_id_by_schema(schema_id);
553
554        let index = {
555            let catalog_reader = self.catalog.read();
556            let schema_catalog = catalog_reader
557                .get_schema_by_id(&database_id, &schema_id)
558                .unwrap();
559            schema_catalog.get_index_by_id(&index_id).unwrap().clone()
560        };
561
562        let index_table_id = index.index_table.id;
563        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.index_id);
564        self.catalog
565            .write()
566            .drop_index(database_id, schema_id, index_id);
567        self.catalog
568            .write()
569            .drop_table(database_id, schema_id, index_table_id);
570        Ok(())
571    }
572
573    async fn drop_function(&self, _function_id: FunctionId) -> Result<()> {
574        unreachable!()
575    }
576
577    async fn drop_connection(&self, _connection_id: ConnectionId) -> Result<()> {
578        unreachable!()
579    }
580
581    async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
582        unreachable!()
583    }
584
585    async fn drop_database(&self, database_id: u32) -> Result<()> {
586        self.catalog.write().drop_database(database_id);
587        Ok(())
588    }
589
590    async fn drop_schema(&self, schema_id: u32, _cascade: bool) -> Result<()> {
591        let database_id = self.drop_schema_id(schema_id);
592        self.catalog.write().drop_schema(database_id, schema_id);
593        Ok(())
594    }
595
596    async fn alter_name(
597        &self,
598        object_id: alter_name_request::Object,
599        object_name: &str,
600    ) -> Result<()> {
601        match object_id {
602            alter_name_request::Object::TableId(table_id) => {
603                self.catalog
604                    .write()
605                    .alter_table_name_by_id(&table_id.into(), object_name);
606                Ok(())
607            }
608            _ => {
609                unimplemented!()
610            }
611        }
612    }
613
614    async fn alter_source(&self, source: PbSource) -> Result<()> {
615        self.catalog.write().update_source(&source);
616        Ok(())
617    }
618
619    async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
620        for database in self.catalog.read().iter_databases() {
621            for schema in database.iter_schemas() {
622                match object {
623                    Object::TableId(table_id) => {
624                        if let Some(table) =
625                            schema.get_created_table_by_id(&TableId::from(table_id))
626                        {
627                            let mut pb_table = table.to_prost();
628                            pb_table.owner = owner_id;
629                            self.catalog.write().update_table(&pb_table);
630                            return Ok(());
631                        }
632                    }
633                    _ => unreachable!(),
634                }
635            }
636        }
637
638        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
639    }
640
641    async fn alter_set_schema(
642        &self,
643        object: alter_set_schema_request::Object,
644        new_schema_id: u32,
645    ) -> Result<()> {
646        match object {
647            alter_set_schema_request::Object::TableId(table_id) => {
648                let mut pb_table = {
649                    let reader = self.catalog.read();
650                    let table = reader.get_any_table_by_id(&table_id.into())?.to_owned();
651                    table.to_prost()
652                };
653                pb_table.schema_id = new_schema_id;
654                self.catalog.write().update_table(&pb_table);
655                self.table_id_to_schema_id
656                    .write()
657                    .insert(table_id, new_schema_id);
658                Ok(())
659            }
660            _ => unreachable!(),
661        }
662    }
663
664    async fn alter_parallelism(
665        &self,
666        _table_id: u32,
667        _parallelism: PbTableParallelism,
668        _deferred: bool,
669    ) -> Result<()> {
670        todo!()
671    }
672
673    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
674        todo!()
675    }
676
677    async fn alter_secret(
678        &self,
679        _secret_id: u32,
680        _secret_name: String,
681        _database_id: u32,
682        _schema_id: u32,
683        _owner_id: u32,
684        _payload: Vec<u8>,
685    ) -> Result<()> {
686        unreachable!()
687    }
688
689    async fn alter_resource_group(
690        &self,
691        _table_id: u32,
692        _resource_group: Option<String>,
693        _deferred: bool,
694    ) -> Result<()> {
695        todo!()
696    }
697}
698
699impl MockCatalogWriter {
700    pub fn new(
701        catalog: Arc<RwLock<Catalog>>,
702        hummock_snapshot_manager: HummockSnapshotManagerRef,
703    ) -> Self {
704        catalog.write().create_database(&PbDatabase {
705            id: 0,
706            name: DEFAULT_DATABASE_NAME.to_owned(),
707            owner: DEFAULT_SUPER_USER_ID,
708            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
709        });
710        catalog.write().create_schema(&PbSchema {
711            id: 1,
712            name: DEFAULT_SCHEMA_NAME.to_owned(),
713            database_id: 0,
714            owner: DEFAULT_SUPER_USER_ID,
715        });
716        catalog.write().create_schema(&PbSchema {
717            id: 2,
718            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
719            database_id: 0,
720            owner: DEFAULT_SUPER_USER_ID,
721        });
722        catalog.write().create_schema(&PbSchema {
723            id: 3,
724            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
725            database_id: 0,
726            owner: DEFAULT_SUPER_USER_ID,
727        });
728        let mut map: HashMap<u32, DatabaseId> = HashMap::new();
729        map.insert(1_u32, 0_u32);
730        map.insert(2_u32, 0_u32);
731        map.insert(3_u32, 0_u32);
732        Self {
733            catalog,
734            id: AtomicU32::new(3),
735            table_id_to_schema_id: Default::default(),
736            schema_id_to_database_id: RwLock::new(map),
737            hummock_snapshot_manager,
738        }
739    }
740
741    fn gen_id(&self) -> u32 {
742        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
743        self.id.fetch_add(1, Ordering::SeqCst) + 1
744    }
745
746    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
747        self.table_id_to_schema_id
748            .write()
749            .insert(table_id, schema_id);
750    }
751
752    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
753        let schema_id = self
754            .table_id_to_schema_id
755            .write()
756            .remove(&table_id)
757            .unwrap();
758        (self.get_database_id_by_schema(schema_id), schema_id)
759    }
760
761    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
762        self.table_id_to_schema_id
763            .write()
764            .insert(table_id, schema_id);
765    }
766
767    fn add_table_or_subscription_id(
768        &self,
769        table_id: u32,
770        schema_id: SchemaId,
771        _database_id: DatabaseId,
772    ) {
773        self.table_id_to_schema_id
774            .write()
775            .insert(table_id, schema_id);
776    }
777
778    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
779        self.table_id_to_schema_id
780            .write()
781            .insert(table_id, schema_id);
782    }
783
784    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
785        let schema_id = self
786            .table_id_to_schema_id
787            .write()
788            .remove(&table_id)
789            .unwrap();
790        (self.get_database_id_by_schema(schema_id), schema_id)
791    }
792
793    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
794        let schema_id = self
795            .table_id_to_schema_id
796            .write()
797            .remove(&table_id)
798            .unwrap();
799        (self.get_database_id_by_schema(schema_id), schema_id)
800    }
801
802    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
803        let schema_id = self
804            .table_id_to_schema_id
805            .write()
806            .remove(&table_id)
807            .unwrap();
808        (self.get_database_id_by_schema(schema_id), schema_id)
809    }
810
811    fn add_schema_id(&self, schema_id: u32, database_id: DatabaseId) {
812        self.schema_id_to_database_id
813            .write()
814            .insert(schema_id, database_id);
815    }
816
817    fn drop_schema_id(&self, schema_id: u32) -> DatabaseId {
818        self.schema_id_to_database_id
819            .write()
820            .remove(&schema_id)
821            .unwrap()
822    }
823
824    fn create_source_inner(&self, mut source: PbSource) -> Result<u32> {
825        source.id = self.gen_id();
826        self.catalog.write().create_source(&source);
827        self.add_table_or_source_id(source.id, source.schema_id, source.database_id);
828        Ok(source.id)
829    }
830
831    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
832        sink.id = self.gen_id();
833        self.catalog.write().create_sink(&sink);
834        self.add_table_or_sink_id(sink.id, sink.schema_id, sink.database_id);
835        Ok(())
836    }
837
838    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
839        subscription.id = self.gen_id();
840        self.catalog.write().create_subscription(&subscription);
841        self.add_table_or_subscription_id(
842            subscription.id,
843            subscription.schema_id,
844            subscription.database_id,
845        );
846        Ok(())
847    }
848
849    fn get_database_id_by_schema(&self, schema_id: u32) -> DatabaseId {
850        *self
851            .schema_id_to_database_id
852            .read()
853            .get(&schema_id)
854            .unwrap()
855    }
856}
857
858pub struct MockUserInfoWriter {
859    id: AtomicU32,
860    user_info: Arc<RwLock<UserInfoManager>>,
861}
862
863#[async_trait::async_trait]
864impl UserInfoWriter for MockUserInfoWriter {
865    async fn create_user(&self, user: UserInfo) -> Result<()> {
866        let mut user = user;
867        user.id = self.gen_id();
868        self.user_info.write().create_user(user);
869        Ok(())
870    }
871
872    async fn drop_user(&self, id: UserId) -> Result<()> {
873        self.user_info.write().drop_user(id);
874        Ok(())
875    }
876
877    async fn update_user(
878        &self,
879        update_user: UserInfo,
880        update_fields: Vec<UpdateField>,
881    ) -> Result<()> {
882        let mut lock = self.user_info.write();
883        let id = update_user.get_id();
884        let Some(old_name) = lock.get_user_name_by_id(id) else {
885            return Ok(());
886        };
887        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
888        update_fields.into_iter().for_each(|field| match field {
889            UpdateField::Super => user_info.is_super = update_user.is_super,
890            UpdateField::Login => user_info.can_login = update_user.can_login,
891            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
892            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
893            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
894            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
895            UpdateField::Unspecified => unreachable!(),
896        });
897        lock.update_user(update_user);
898        Ok(())
899    }
900
901    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
902    /// `GrantAllSources` when grant privilege to user.
903    async fn grant_privilege(
904        &self,
905        users: Vec<UserId>,
906        privileges: Vec<GrantPrivilege>,
907        with_grant_option: bool,
908        _grantor: UserId,
909    ) -> Result<()> {
910        let privileges = privileges
911            .into_iter()
912            .map(|mut p| {
913                p.action_with_opts
914                    .iter_mut()
915                    .for_each(|ao| ao.with_grant_option = with_grant_option);
916                p
917            })
918            .collect::<Vec<_>>();
919        for user_id in users {
920            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
921                u.extend_privileges(privileges.clone());
922            }
923        }
924        Ok(())
925    }
926
927    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
928    /// `RevokeAllSources` when revoke privilege from user.
929    async fn revoke_privilege(
930        &self,
931        users: Vec<UserId>,
932        privileges: Vec<GrantPrivilege>,
933        _granted_by: UserId,
934        _revoke_by: UserId,
935        revoke_grant_option: bool,
936        _cascade: bool,
937    ) -> Result<()> {
938        for user_id in users {
939            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
940                u.revoke_privileges(privileges.clone(), revoke_grant_option);
941            }
942        }
943        Ok(())
944    }
945}
946
947impl MockUserInfoWriter {
948    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
949        user_info.write().create_user(UserInfo {
950            id: DEFAULT_SUPER_USER_ID,
951            name: DEFAULT_SUPER_USER.to_owned(),
952            is_super: true,
953            can_create_db: true,
954            can_create_user: true,
955            can_login: true,
956            ..Default::default()
957        });
958        Self {
959            user_info,
960            id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
961        }
962    }
963
964    fn gen_id(&self) -> u32 {
965        self.id.fetch_add(1, Ordering::SeqCst)
966    }
967}
968
969pub struct MockFrontendMetaClient {}
970
971#[async_trait::async_trait]
972impl FrontendMetaClient for MockFrontendMetaClient {
973    async fn try_unregister(&self) {}
974
975    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
976        Ok(INVALID_VERSION_ID)
977    }
978
979    async fn wait(&self) -> RpcResult<()> {
980        Ok(())
981    }
982
983    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
984        Ok(vec![])
985    }
986
987    async fn list_table_fragments(
988        &self,
989        _table_ids: &[u32],
990    ) -> RpcResult<HashMap<u32, TableFragmentInfo>> {
991        Ok(HashMap::default())
992    }
993
994    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
995        Ok(vec![])
996    }
997
998    async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
999        Ok(vec![])
1000    }
1001
1002    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1003        Ok(vec![])
1004    }
1005
1006    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1007        Ok(vec![])
1008    }
1009
1010    async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1011        Ok(vec![])
1012    }
1013
1014    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1015        Ok(vec![])
1016    }
1017
1018    async fn get_system_params(&self) -> RpcResult<SystemParamsReader> {
1019        Ok(SystemParams::default().into())
1020    }
1021
1022    async fn set_system_param(
1023        &self,
1024        _param: String,
1025        _value: Option<String>,
1026    ) -> RpcResult<Option<SystemParamsReader>> {
1027        Ok(Some(SystemParams::default().into()))
1028    }
1029
1030    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1031        Ok(Default::default())
1032    }
1033
1034    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1035        Ok("".to_owned())
1036    }
1037
1038    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1039        Ok(vec![])
1040    }
1041
1042    async fn get_tables(
1043        &self,
1044        _table_ids: &[u32],
1045        _include_dropped_tables: bool,
1046    ) -> RpcResult<HashMap<u32, Table>> {
1047        Ok(HashMap::new())
1048    }
1049
1050    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(u32, u64)>> {
1051        unimplemented!()
1052    }
1053
1054    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1055        Ok(HummockVersion::default())
1056    }
1057
1058    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1059        unimplemented!()
1060    }
1061
1062    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1063        unimplemented!()
1064    }
1065
1066    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1067        unimplemented!()
1068    }
1069
1070    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1071        unimplemented!()
1072    }
1073
1074    async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1075        unimplemented!()
1076    }
1077
1078    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1079        unimplemented!()
1080    }
1081
1082    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1083        unimplemented!()
1084    }
1085
1086    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1087        unimplemented!()
1088    }
1089
1090    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1091        Ok(vec![])
1092    }
1093
1094    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1095        unimplemented!()
1096    }
1097
1098    async fn recover(&self) -> RpcResult<()> {
1099        unimplemented!()
1100    }
1101
1102    async fn apply_throttle(
1103        &self,
1104        _kind: PbThrottleTarget,
1105        _id: u32,
1106        _rate_limit: Option<u32>,
1107    ) -> RpcResult<()> {
1108        unimplemented!()
1109    }
1110
1111    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1112        Ok(RecoveryStatus::StatusRunning)
1113    }
1114
1115    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1116        Ok(vec![])
1117    }
1118
1119    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1120        Ok(vec![])
1121    }
1122
1123    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1124        unimplemented!()
1125    }
1126
1127    async fn alter_sink_props(
1128        &self,
1129        _sink_id: u32,
1130        _changed_props: BTreeMap<String, String>,
1131        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1132        _connector_conn_ref: Option<u32>,
1133    ) -> RpcResult<()> {
1134        unimplemented!()
1135    }
1136
1137    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1138        unimplemented!()
1139    }
1140}
1141
1142#[cfg(test)]
1143pub static PROTO_FILE_DATA: &str = r#"
1144    syntax = "proto3";
1145    package test;
1146    message TestRecord {
1147      int32 id = 1;
1148      Country country = 3;
1149      int64 zipcode = 4;
1150      float rate = 5;
1151    }
1152    message TestRecordAlterType {
1153        string id = 1;
1154        Country country = 3;
1155        int32 zipcode = 4;
1156        float rate = 5;
1157      }
1158    message TestRecordExt {
1159      int32 id = 1;
1160      Country country = 3;
1161      int64 zipcode = 4;
1162      float rate = 5;
1163      string name = 6;
1164    }
1165    message Country {
1166      string address = 1;
1167      City city = 2;
1168      string zipcode = 3;
1169    }
1170    message City {
1171      string address = 1;
1172      string zipcode = 2;
1173    }"#;
1174
1175/// Returns the file.
1176/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1177pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1178    let in_file = Builder::new()
1179        .prefix("temp")
1180        .suffix(".proto")
1181        .rand_bytes(8)
1182        .tempfile()
1183        .unwrap();
1184
1185    let out_file = Builder::new()
1186        .prefix("temp")
1187        .suffix(".pb")
1188        .rand_bytes(8)
1189        .tempfile()
1190        .unwrap();
1191
1192    let mut file = in_file.as_file();
1193    file.write_all(proto_data.as_ref())
1194        .expect("writing binary to test file");
1195    file.flush().expect("flush temp file failed");
1196    let include_path = in_file
1197        .path()
1198        .parent()
1199        .unwrap()
1200        .to_string_lossy()
1201        .into_owned();
1202    let out_path = out_file.path().to_string_lossy().into_owned();
1203    let in_path = in_file.path().to_string_lossy().into_owned();
1204    let mut compile = std::process::Command::new("protoc");
1205
1206    let out = compile
1207        .arg("--include_imports")
1208        .arg("-I")
1209        .arg(include_path)
1210        .arg(format!("--descriptor_set_out={}", out_path))
1211        .arg(in_path)
1212        .output()
1213        .expect("failed to compile proto");
1214    if !out.status.success() {
1215        panic!("compile proto failed \n output: {:?}", out);
1216    }
1217    out_file
1218}