risingwave_frontend/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29    DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30    FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31    RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::util::cluster_limit::ClusterLimit;
38use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
39use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
40use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
41use risingwave_pb::backup_service::MetaSnapshotMetadata;
42use risingwave_pb::catalog::{
43    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44    PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
49use risingwave_pb::ddl_service::{
50    DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
51    alter_swap_rename_request, create_connection_request,
52};
53use risingwave_pb::hummock::write_limits::WriteLimit;
54use risingwave_pb::hummock::{
55    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
56};
57use risingwave_pb::id::ActorId;
58use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
59use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
60use risingwave_pb::meta::list_actor_states_response::ActorState;
61use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
62use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
63use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
64use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
65use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
66use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
67use risingwave_pb::meta::{
68    EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
69    RefreshRequest, RefreshResponse, SystemParams,
70};
71use risingwave_pb::secret::PbSecretRef;
72use risingwave_pb::stream_plan::StreamFragmentGraph;
73use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
74use risingwave_pb::user::update_user_request::UpdateField;
75use risingwave_pb::user::{GrantPrivilege, UserInfo};
76use risingwave_rpc_client::error::Result as RpcResult;
77use tempfile::{Builder, NamedTempFile};
78
79use crate::FrontendOpts;
80use crate::catalog::catalog_service::CatalogWriter;
81use crate::catalog::root_catalog::Catalog;
82use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
83use crate::error::{ErrorCode, Result};
84use crate::handler::RwPgResponse;
85use crate::meta_client::FrontendMetaClient;
86use crate::scheduler::HummockSnapshotManagerRef;
87use crate::session::{AuthContext, FrontendEnv, SessionImpl};
88use crate::user::UserId;
89use crate::user::user_manager::UserInfoManager;
90use crate::user::user_service::UserInfoWriter;
91
92/// An embedded frontend without starting meta and without starting frontend as a tcp server.
93pub struct LocalFrontend {
94    pub opts: FrontendOpts,
95    env: FrontendEnv,
96}
97
98impl SessionManager for LocalFrontend {
99    type Session = SessionImpl;
100
101    fn create_dummy_session(
102        &self,
103        _database_id: DatabaseId,
104        _user_name: u32,
105    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
106        unreachable!()
107    }
108
109    fn connect(
110        &self,
111        _database: &str,
112        _user_name: &str,
113        _peer_addr: AddressRef,
114    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
115        Ok(self.session_ref())
116    }
117
118    fn cancel_queries_in_session(&self, _session_id: SessionId) {
119        unreachable!()
120    }
121
122    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
123        unreachable!()
124    }
125
126    fn end_session(&self, _session: &Self::Session) {
127        unreachable!()
128    }
129}
130
131impl LocalFrontend {
132    #[expect(clippy::unused_async)]
133    pub async fn new(opts: FrontendOpts) -> Self {
134        let env = FrontendEnv::mock();
135        Self { opts, env }
136    }
137
138    pub async fn run_sql(
139        &self,
140        sql: impl Into<String>,
141    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
142        let sql: Arc<str> = Arc::from(sql.into());
143        self.session_ref().run_statement(sql, vec![]).await
144    }
145
146    pub async fn run_sql_with_session(
147        &self,
148        session_ref: Arc<SessionImpl>,
149        sql: impl Into<String>,
150    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
151        let sql: Arc<str> = Arc::from(sql.into());
152        session_ref.run_statement(sql, vec![]).await
153    }
154
155    pub async fn run_user_sql(
156        &self,
157        sql: impl Into<String>,
158        database: String,
159        user_name: String,
160        user_id: UserId,
161    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
162        let sql: Arc<str> = Arc::from(sql.into());
163        self.session_user_ref(database, user_name, user_id)
164            .run_statement(sql, vec![])
165            .await
166    }
167
168    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
169        let mut rsp = self.run_sql(sql).await.unwrap();
170        let mut res = vec![];
171        #[for_await]
172        for row_set in rsp.values_stream() {
173            for row in row_set.unwrap() {
174                res.push(format!("{:?}", row));
175            }
176        }
177        res
178    }
179
180    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
181        let mut rsp = self.run_sql(sql).await.unwrap();
182        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
183        let mut res = String::new();
184        #[for_await]
185        for row_set in rsp.values_stream() {
186            for row in row_set.unwrap() {
187                let row: Row = row;
188                let row = row.values()[0].as_ref().unwrap();
189                res += std::str::from_utf8(row).unwrap();
190                res += "\n";
191            }
192        }
193        res
194    }
195
196    /// Creates a new session
197    pub fn session_ref(&self) -> Arc<SessionImpl> {
198        self.session_user_ref(
199            DEFAULT_DATABASE_NAME.to_owned(),
200            DEFAULT_SUPER_USER.to_owned(),
201            DEFAULT_SUPER_USER_ID,
202        )
203    }
204
205    pub fn session_user_ref(
206        &self,
207        database: String,
208        user_name: String,
209        user_id: UserId,
210    ) -> Arc<SessionImpl> {
211        Arc::new(SessionImpl::new(
212            self.env.clone(),
213            AuthContext::new(database, user_name, user_id),
214            UserAuthenticator::None,
215            // Local Frontend use a non-sense id.
216            (0, 0),
217            Address::Tcp(SocketAddr::new(
218                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
219                6666,
220            ))
221            .into(),
222            Default::default(),
223        ))
224    }
225}
226
227pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
228    if rsp.stmt_type() != StatementType::EXPLAIN {
229        panic!("RESPONSE INVALID: {rsp:?}");
230    }
231    let mut res = String::new();
232    #[for_await]
233    for row_set in rsp.values_stream() {
234        for row in row_set.unwrap() {
235            let row: Row = row;
236            let row = row.values()[0].as_ref().unwrap();
237            res += std::str::from_utf8(row).unwrap();
238            res += "\n";
239        }
240    }
241    res
242}
243
244pub struct MockCatalogWriter {
245    catalog: Arc<RwLock<Catalog>>,
246    id: AtomicU32,
247    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
248    schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
249    hummock_snapshot_manager: HummockSnapshotManagerRef,
250}
251
252#[async_trait::async_trait]
253impl CatalogWriter for MockCatalogWriter {
254    async fn create_database(
255        &self,
256        db_name: &str,
257        owner: UserId,
258        resource_group: &str,
259        barrier_interval_ms: Option<u32>,
260        checkpoint_frequency: Option<u64>,
261    ) -> Result<()> {
262        let database_id = DatabaseId::new(self.gen_id());
263        self.catalog.write().create_database(&PbDatabase {
264            name: db_name.to_owned(),
265            id: database_id,
266            owner,
267            resource_group: resource_group.to_owned(),
268            barrier_interval_ms,
269            checkpoint_frequency,
270        });
271        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
272            .await?;
273        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
274            .await?;
275        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
276            .await?;
277        Ok(())
278    }
279
280    async fn create_schema(
281        &self,
282        db_id: DatabaseId,
283        schema_name: &str,
284        owner: UserId,
285    ) -> Result<()> {
286        let id = self.gen_id();
287        self.catalog.write().create_schema(&PbSchema {
288            id,
289            name: schema_name.to_owned(),
290            database_id: db_id,
291            owner,
292        });
293        self.add_schema_id(id, db_id);
294        Ok(())
295    }
296
297    async fn create_materialized_view(
298        &self,
299        mut table: PbTable,
300        _graph: StreamFragmentGraph,
301        _dependencies: HashSet<ObjectId>,
302        _specific_resource_group: Option<String>,
303        _if_not_exists: bool,
304    ) -> Result<()> {
305        table.id = self.gen_id();
306        table.stream_job_status = PbStreamJobStatus::Created as _;
307        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
308        self.catalog.write().create_table(&table);
309        self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
310        self.hummock_snapshot_manager.add_table_for_test(table.id);
311        Ok(())
312    }
313
314    async fn replace_materialized_view(
315        &self,
316        mut table: PbTable,
317        _graph: StreamFragmentGraph,
318    ) -> Result<()> {
319        table.stream_job_status = PbStreamJobStatus::Created as _;
320        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
321        self.catalog.write().update_table(&table);
322        Ok(())
323    }
324
325    async fn create_view(&self, mut view: PbView, _dependencies: HashSet<ObjectId>) -> Result<()> {
326        view.id = self.gen_id();
327        self.catalog.write().create_view(&view);
328        self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
329        Ok(())
330    }
331
332    async fn create_table(
333        &self,
334        source: Option<PbSource>,
335        mut table: PbTable,
336        graph: StreamFragmentGraph,
337        _job_type: PbTableJobType,
338        if_not_exists: bool,
339        _dependencies: HashSet<ObjectId>,
340    ) -> Result<()> {
341        if let Some(source) = source {
342            let source_id = self.create_source_inner(source)?;
343            table.optional_associated_source_id = Some(source_id.into());
344        }
345        self.create_materialized_view(table, graph, HashSet::new(), None, if_not_exists)
346            .await?;
347        Ok(())
348    }
349
350    async fn replace_table(
351        &self,
352        _source: Option<PbSource>,
353        mut table: PbTable,
354        _graph: StreamFragmentGraph,
355        _job_type: TableJobType,
356    ) -> Result<()> {
357        table.stream_job_status = PbStreamJobStatus::Created as _;
358        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
359        self.catalog.write().update_table(&table);
360        Ok(())
361    }
362
363    async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
364        self.catalog.write().update_source(&source);
365        Ok(())
366    }
367
368    async fn create_source(
369        &self,
370        source: PbSource,
371        _graph: Option<StreamFragmentGraph>,
372        _if_not_exists: bool,
373    ) -> Result<()> {
374        self.create_source_inner(source).map(|_| ())
375    }
376
377    async fn create_sink(
378        &self,
379        sink: PbSink,
380        graph: StreamFragmentGraph,
381        _dependencies: HashSet<ObjectId>,
382        _if_not_exists: bool,
383    ) -> Result<()> {
384        self.create_sink_inner(sink, graph)
385    }
386
387    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
388        self.create_subscription_inner(subscription)
389    }
390
391    async fn create_index(
392        &self,
393        mut index: PbIndex,
394        mut index_table: PbTable,
395        _graph: StreamFragmentGraph,
396        _if_not_exists: bool,
397    ) -> Result<()> {
398        index_table.id = self.gen_id();
399        index_table.stream_job_status = PbStreamJobStatus::Created as _;
400        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
401        self.catalog.write().create_table(&index_table);
402        self.add_table_or_index_id(
403            index_table.id.as_raw_id(),
404            index_table.schema_id,
405            index_table.database_id,
406        );
407
408        index.id = index_table.id.as_raw_id().into();
409        index.index_table_id = index_table.id;
410        self.catalog.write().create_index(&index);
411        Ok(())
412    }
413
414    async fn create_function(&self, _function: PbFunction) -> Result<()> {
415        unreachable!()
416    }
417
418    async fn create_connection(
419        &self,
420        _connection_name: String,
421        _database_id: DatabaseId,
422        _schema_id: SchemaId,
423        _owner_id: u32,
424        _connection: create_connection_request::Payload,
425    ) -> Result<()> {
426        unreachable!()
427    }
428
429    async fn create_secret(
430        &self,
431        _secret_name: String,
432        _database_id: DatabaseId,
433        _schema_id: SchemaId,
434        _owner_id: u32,
435        _payload: Vec<u8>,
436    ) -> Result<()> {
437        unreachable!()
438    }
439
440    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
441        unreachable!()
442    }
443
444    async fn drop_table(
445        &self,
446        source_id: Option<u32>,
447        table_id: TableId,
448        cascade: bool,
449    ) -> Result<()> {
450        if cascade {
451            return Err(ErrorCode::NotSupported(
452                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
453                "use drop instead".to_owned(),
454            )
455            .into());
456        }
457        if let Some(source_id) = source_id {
458            self.drop_table_or_source_id(source_id);
459        }
460        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
461        let indexes =
462            self.catalog
463                .read()
464                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
465        for index in indexes {
466            self.drop_index(index.id, cascade).await?;
467        }
468        self.catalog
469            .write()
470            .drop_table(database_id, schema_id, table_id);
471        if let Some(source_id) = source_id {
472            self.catalog
473                .write()
474                .drop_source(database_id, schema_id, source_id.into());
475        }
476        Ok(())
477    }
478
479    async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
480        unreachable!()
481    }
482
483    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
484        if cascade {
485            return Err(ErrorCode::NotSupported(
486                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
487                "use drop instead".to_owned(),
488            )
489            .into());
490        }
491        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
492        let indexes =
493            self.catalog
494                .read()
495                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
496        for index in indexes {
497            self.drop_index(index.id, cascade).await?;
498        }
499        self.catalog
500            .write()
501            .drop_table(database_id, schema_id, table_id);
502        Ok(())
503    }
504
505    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
506        if cascade {
507            return Err(ErrorCode::NotSupported(
508                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
509                "use drop instead".to_owned(),
510            )
511            .into());
512        }
513        let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
514        self.catalog
515            .write()
516            .drop_source(database_id, schema_id, source_id);
517        Ok(())
518    }
519
520    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
521        if cascade {
522            return Err(ErrorCode::NotSupported(
523                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
524                "use drop instead".to_owned(),
525            )
526            .into());
527        }
528        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
529        self.catalog
530            .write()
531            .drop_sink(database_id, schema_id, sink_id);
532        Ok(())
533    }
534
535    async fn drop_subscription(
536        &self,
537        subscription_id: SubscriptionId,
538        cascade: bool,
539    ) -> Result<()> {
540        if cascade {
541            return Err(ErrorCode::NotSupported(
542                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
543                "use drop instead".to_owned(),
544            )
545            .into());
546        }
547        let (database_id, schema_id) =
548            self.drop_table_or_subscription_id(subscription_id.as_raw_id());
549        self.catalog
550            .write()
551            .drop_subscription(database_id, schema_id, subscription_id);
552        Ok(())
553    }
554
555    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
556        if cascade {
557            return Err(ErrorCode::NotSupported(
558                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
559                "use drop instead".to_owned(),
560            )
561            .into());
562        }
563        let &schema_id = self
564            .table_id_to_schema_id
565            .read()
566            .get(&index_id.as_raw_id())
567            .unwrap();
568        let database_id = self.get_database_id_by_schema(schema_id);
569
570        let index = {
571            let catalog_reader = self.catalog.read();
572            let schema_catalog = catalog_reader
573                .get_schema_by_id(database_id, schema_id)
574                .unwrap();
575            schema_catalog.get_index_by_id(index_id).unwrap().clone()
576        };
577
578        let index_table_id = index.index_table().id;
579        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
580        self.catalog
581            .write()
582            .drop_index(database_id, schema_id, index_id);
583        self.catalog
584            .write()
585            .drop_table(database_id, schema_id, index_table_id);
586        Ok(())
587    }
588
589    async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
590        unreachable!()
591    }
592
593    async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
594        unreachable!()
595    }
596
597    async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
598        unreachable!()
599    }
600
601    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
602        self.catalog.write().drop_database(database_id);
603        Ok(())
604    }
605
606    async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
607        let database_id = self.drop_schema_id(schema_id);
608        self.catalog.write().drop_schema(database_id, schema_id);
609        Ok(())
610    }
611
612    async fn alter_name(
613        &self,
614        object_id: alter_name_request::Object,
615        object_name: &str,
616    ) -> Result<()> {
617        match object_id {
618            alter_name_request::Object::TableId(table_id) => {
619                self.catalog
620                    .write()
621                    .alter_table_name_by_id(table_id.into(), object_name);
622                Ok(())
623            }
624            _ => {
625                unimplemented!()
626            }
627        }
628    }
629
630    async fn alter_source(&self, source: PbSource) -> Result<()> {
631        self.catalog.write().update_source(&source);
632        Ok(())
633    }
634
635    async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
636        for database in self.catalog.read().iter_databases() {
637            for schema in database.iter_schemas() {
638                match object {
639                    Object::TableId(table_id) => {
640                        if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
641                        {
642                            let mut pb_table = table.to_prost();
643                            pb_table.owner = owner_id;
644                            self.catalog.write().update_table(&pb_table);
645                            return Ok(());
646                        }
647                    }
648                    _ => unreachable!(),
649                }
650            }
651        }
652
653        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
654    }
655
656    async fn alter_set_schema(
657        &self,
658        object: alter_set_schema_request::Object,
659        new_schema_id: SchemaId,
660    ) -> Result<()> {
661        match object {
662            alter_set_schema_request::Object::TableId(table_id) => {
663                let mut pb_table = {
664                    let reader = self.catalog.read();
665                    let table = reader.get_any_table_by_id(table_id.into())?.to_owned();
666                    table.to_prost()
667                };
668                pb_table.schema_id = new_schema_id;
669                self.catalog.write().update_table(&pb_table);
670                self.table_id_to_schema_id
671                    .write()
672                    .insert(table_id, new_schema_id);
673                Ok(())
674            }
675            _ => unreachable!(),
676        }
677    }
678
679    async fn alter_parallelism(
680        &self,
681        _job_id: JobId,
682        _parallelism: PbTableParallelism,
683        _deferred: bool,
684    ) -> Result<()> {
685        todo!()
686    }
687
688    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
689        todo!()
690    }
691
692    async fn alter_secret(
693        &self,
694        _secret_id: SecretId,
695        _secret_name: String,
696        _database_id: DatabaseId,
697        _schema_id: SchemaId,
698        _owner_id: u32,
699        _payload: Vec<u8>,
700    ) -> Result<()> {
701        unreachable!()
702    }
703
704    async fn alter_resource_group(
705        &self,
706        _table_id: TableId,
707        _resource_group: Option<String>,
708        _deferred: bool,
709    ) -> Result<()> {
710        todo!()
711    }
712
713    async fn alter_database_param(
714        &self,
715        database_id: DatabaseId,
716        param: AlterDatabaseParam,
717    ) -> Result<()> {
718        let mut pb_database = {
719            let reader = self.catalog.read();
720            let database = reader.get_database_by_id(database_id)?.to_owned();
721            database.to_prost()
722        };
723        match param {
724            AlterDatabaseParam::BarrierIntervalMs(interval) => {
725                pb_database.barrier_interval_ms = interval;
726            }
727            AlterDatabaseParam::CheckpointFrequency(frequency) => {
728                pb_database.checkpoint_frequency = frequency;
729            }
730        }
731        self.catalog.write().update_database(&pb_database);
732        Ok(())
733    }
734
735    async fn create_iceberg_table(
736        &self,
737        _table_job_info: PbTableJobInfo,
738        _sink_job_info: PbSinkJobInfo,
739        _iceberg_source: PbSource,
740        _if_not_exists: bool,
741    ) -> Result<()> {
742        todo!()
743    }
744}
745
746impl MockCatalogWriter {
747    pub fn new(
748        catalog: Arc<RwLock<Catalog>>,
749        hummock_snapshot_manager: HummockSnapshotManagerRef,
750    ) -> Self {
751        catalog.write().create_database(&PbDatabase {
752            id: 0.into(),
753            name: DEFAULT_DATABASE_NAME.to_owned(),
754            owner: DEFAULT_SUPER_USER_ID,
755            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
756            barrier_interval_ms: None,
757            checkpoint_frequency: None,
758        });
759        catalog.write().create_schema(&PbSchema {
760            id: 1.into(),
761            name: DEFAULT_SCHEMA_NAME.to_owned(),
762            database_id: 0.into(),
763            owner: DEFAULT_SUPER_USER_ID,
764        });
765        catalog.write().create_schema(&PbSchema {
766            id: 2.into(),
767            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
768            database_id: 0.into(),
769            owner: DEFAULT_SUPER_USER_ID,
770        });
771        catalog.write().create_schema(&PbSchema {
772            id: 3.into(),
773            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
774            database_id: 0.into(),
775            owner: DEFAULT_SUPER_USER_ID,
776        });
777        let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
778        map.insert(1_u32.into(), 0_u32.into());
779        map.insert(2_u32.into(), 0_u32.into());
780        map.insert(3_u32.into(), 0_u32.into());
781        Self {
782            catalog,
783            id: AtomicU32::new(3),
784            table_id_to_schema_id: Default::default(),
785            schema_id_to_database_id: RwLock::new(map),
786            hummock_snapshot_manager,
787        }
788    }
789
790    fn gen_id<T: From<u32>>(&self) -> T {
791        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
792        (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
793    }
794
795    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
796        self.table_id_to_schema_id
797            .write()
798            .insert(table_id, schema_id);
799    }
800
801    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
802        let schema_id = self
803            .table_id_to_schema_id
804            .write()
805            .remove(&table_id)
806            .unwrap();
807        (self.get_database_id_by_schema(schema_id), schema_id)
808    }
809
810    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
811        self.table_id_to_schema_id
812            .write()
813            .insert(table_id, schema_id);
814    }
815
816    fn add_table_or_subscription_id(
817        &self,
818        table_id: u32,
819        schema_id: SchemaId,
820        _database_id: DatabaseId,
821    ) {
822        self.table_id_to_schema_id
823            .write()
824            .insert(table_id, schema_id);
825    }
826
827    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
828        self.table_id_to_schema_id
829            .write()
830            .insert(table_id, schema_id);
831    }
832
833    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
834        let schema_id = self
835            .table_id_to_schema_id
836            .write()
837            .remove(&table_id)
838            .unwrap();
839        (self.get_database_id_by_schema(schema_id), schema_id)
840    }
841
842    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
843        let schema_id = self
844            .table_id_to_schema_id
845            .write()
846            .remove(&table_id)
847            .unwrap();
848        (self.get_database_id_by_schema(schema_id), schema_id)
849    }
850
851    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
852        let schema_id = self
853            .table_id_to_schema_id
854            .write()
855            .remove(&table_id)
856            .unwrap();
857        (self.get_database_id_by_schema(schema_id), schema_id)
858    }
859
860    fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
861        self.schema_id_to_database_id
862            .write()
863            .insert(schema_id, database_id);
864    }
865
866    fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
867        self.schema_id_to_database_id
868            .write()
869            .remove(&schema_id)
870            .unwrap()
871    }
872
873    fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
874        source.id = self.gen_id();
875        self.catalog.write().create_source(&source);
876        self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
877        Ok(source.id)
878    }
879
880    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
881        sink.id = self.gen_id();
882        sink.stream_job_status = PbStreamJobStatus::Created as _;
883        self.catalog.write().create_sink(&sink);
884        self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
885        Ok(())
886    }
887
888    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
889        subscription.id = self.gen_id();
890        self.catalog.write().create_subscription(&subscription);
891        self.add_table_or_subscription_id(
892            subscription.id.as_raw_id(),
893            subscription.schema_id,
894            subscription.database_id,
895        );
896        Ok(())
897    }
898
899    fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
900        *self
901            .schema_id_to_database_id
902            .read()
903            .get(&schema_id)
904            .unwrap()
905    }
906}
907
908pub struct MockUserInfoWriter {
909    id: AtomicU32,
910    user_info: Arc<RwLock<UserInfoManager>>,
911}
912
913#[async_trait::async_trait]
914impl UserInfoWriter for MockUserInfoWriter {
915    async fn create_user(&self, user: UserInfo) -> Result<()> {
916        let mut user = user;
917        user.id = self.gen_id();
918        self.user_info.write().create_user(user);
919        Ok(())
920    }
921
922    async fn drop_user(&self, id: UserId) -> Result<()> {
923        self.user_info.write().drop_user(id);
924        Ok(())
925    }
926
927    async fn update_user(
928        &self,
929        update_user: UserInfo,
930        update_fields: Vec<UpdateField>,
931    ) -> Result<()> {
932        let mut lock = self.user_info.write();
933        let id = update_user.get_id();
934        let Some(old_name) = lock.get_user_name_by_id(id) else {
935            return Ok(());
936        };
937        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
938        update_fields.into_iter().for_each(|field| match field {
939            UpdateField::Super => user_info.is_super = update_user.is_super,
940            UpdateField::Login => user_info.can_login = update_user.can_login,
941            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
942            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
943            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
944            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
945            UpdateField::Admin => user_info.is_admin = update_user.is_admin,
946            UpdateField::Unspecified => unreachable!(),
947        });
948        lock.update_user(update_user);
949        Ok(())
950    }
951
952    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
953    /// `GrantAllSources` when grant privilege to user.
954    async fn grant_privilege(
955        &self,
956        users: Vec<UserId>,
957        privileges: Vec<GrantPrivilege>,
958        with_grant_option: bool,
959        _grantor: UserId,
960    ) -> Result<()> {
961        let privileges = privileges
962            .into_iter()
963            .map(|mut p| {
964                p.action_with_opts
965                    .iter_mut()
966                    .for_each(|ao| ao.with_grant_option = with_grant_option);
967                p
968            })
969            .collect::<Vec<_>>();
970        for user_id in users {
971            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
972                u.extend_privileges(privileges.clone());
973            }
974        }
975        Ok(())
976    }
977
978    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
979    /// `RevokeAllSources` when revoke privilege from user.
980    async fn revoke_privilege(
981        &self,
982        users: Vec<UserId>,
983        privileges: Vec<GrantPrivilege>,
984        _granted_by: UserId,
985        _revoke_by: UserId,
986        revoke_grant_option: bool,
987        _cascade: bool,
988    ) -> Result<()> {
989        for user_id in users {
990            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
991                u.revoke_privileges(privileges.clone(), revoke_grant_option);
992            }
993        }
994        Ok(())
995    }
996
997    async fn alter_default_privilege(
998        &self,
999        _users: Vec<UserId>,
1000        _database_id: DatabaseId,
1001        _schemas: Vec<SchemaId>,
1002        _operation: AlterDefaultPrivilegeOperation,
1003        _operated_by: UserId,
1004    ) -> Result<()> {
1005        todo!()
1006    }
1007}
1008
1009impl MockUserInfoWriter {
1010    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1011        user_info.write().create_user(UserInfo {
1012            id: DEFAULT_SUPER_USER_ID,
1013            name: DEFAULT_SUPER_USER.to_owned(),
1014            is_super: true,
1015            can_create_db: true,
1016            can_create_user: true,
1017            can_login: true,
1018            ..Default::default()
1019        });
1020        user_info.write().create_user(UserInfo {
1021            id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1022            name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1023            is_super: true,
1024            can_create_db: true,
1025            can_create_user: true,
1026            can_login: true,
1027            is_admin: true,
1028            ..Default::default()
1029        });
1030        Self {
1031            user_info,
1032            id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
1033        }
1034    }
1035
1036    fn gen_id(&self) -> u32 {
1037        self.id.fetch_add(1, Ordering::SeqCst)
1038    }
1039}
1040
1041pub struct MockFrontendMetaClient {}
1042
1043#[async_trait::async_trait]
1044impl FrontendMetaClient for MockFrontendMetaClient {
1045    async fn try_unregister(&self) {}
1046
1047    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1048        Ok(INVALID_VERSION_ID)
1049    }
1050
1051    async fn wait(&self) -> RpcResult<()> {
1052        Ok(())
1053    }
1054
1055    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1056        Ok(vec![])
1057    }
1058
1059    async fn list_table_fragments(
1060        &self,
1061        _table_ids: &[JobId],
1062    ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1063        Ok(HashMap::default())
1064    }
1065
1066    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1067        Ok(vec![])
1068    }
1069
1070    async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1071        Ok(vec![])
1072    }
1073
1074    async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1075        Ok(vec![])
1076    }
1077
1078    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1079        Ok(vec![])
1080    }
1081
1082    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1083        Ok(vec![])
1084    }
1085
1086    async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1087        Ok(vec![])
1088    }
1089
1090    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1091        Ok(vec![])
1092    }
1093
1094    async fn set_system_param(
1095        &self,
1096        _param: String,
1097        _value: Option<String>,
1098    ) -> RpcResult<Option<SystemParamsReader>> {
1099        Ok(Some(SystemParams::default().into()))
1100    }
1101
1102    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1103        Ok(Default::default())
1104    }
1105
1106    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1107        Ok("".to_owned())
1108    }
1109
1110    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1111        Ok(vec![])
1112    }
1113
1114    async fn get_tables(
1115        &self,
1116        _table_ids: Vec<crate::catalog::TableId>,
1117        _include_dropped_tables: bool,
1118    ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1119        Ok(HashMap::new())
1120    }
1121
1122    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, u64)>> {
1123        unimplemented!()
1124    }
1125
1126    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1127        Ok(HummockVersion::default())
1128    }
1129
1130    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1131        unimplemented!()
1132    }
1133
1134    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1135        unimplemented!()
1136    }
1137
1138    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1139        unimplemented!()
1140    }
1141
1142    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1143        unimplemented!()
1144    }
1145
1146    async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1147        unimplemented!()
1148    }
1149
1150    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1151        unimplemented!()
1152    }
1153
1154    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1155        unimplemented!()
1156    }
1157
1158    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1159        unimplemented!()
1160    }
1161
1162    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1163        Ok(vec![])
1164    }
1165
1166    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1167        unimplemented!()
1168    }
1169
1170    async fn recover(&self) -> RpcResult<()> {
1171        unimplemented!()
1172    }
1173
1174    async fn apply_throttle(
1175        &self,
1176        _kind: PbThrottleTarget,
1177        _id: u32,
1178        _rate_limit: Option<u32>,
1179    ) -> RpcResult<()> {
1180        unimplemented!()
1181    }
1182
1183    async fn alter_fragment_parallelism(
1184        &self,
1185        _fragment_ids: Vec<FragmentId>,
1186        _parallelism: Option<PbTableParallelism>,
1187    ) -> RpcResult<()> {
1188        unimplemented!()
1189    }
1190
1191    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1192        Ok(RecoveryStatus::StatusRunning)
1193    }
1194
1195    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1196        Ok(vec![])
1197    }
1198
1199    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1200        Ok(vec![])
1201    }
1202
1203    async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1204        Ok(HashMap::default())
1205    }
1206
1207    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1208        unimplemented!()
1209    }
1210
1211    async fn alter_sink_props(
1212        &self,
1213        _sink_id: SinkId,
1214        _changed_props: BTreeMap<String, String>,
1215        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1216        _connector_conn_ref: Option<ConnectionId>,
1217    ) -> RpcResult<()> {
1218        unimplemented!()
1219    }
1220
1221    async fn alter_iceberg_table_props(
1222        &self,
1223        _table_id: TableId,
1224        _sink_id: SinkId,
1225        _source_id: SourceId,
1226        _changed_props: BTreeMap<String, String>,
1227        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1228        _connector_conn_ref: Option<ConnectionId>,
1229    ) -> RpcResult<()> {
1230        unimplemented!()
1231    }
1232
1233    async fn alter_source_connector_props(
1234        &self,
1235        _source_id: SourceId,
1236        _changed_props: BTreeMap<String, String>,
1237        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1238        _connector_conn_ref: Option<ConnectionId>,
1239    ) -> RpcResult<()> {
1240        unimplemented!()
1241    }
1242
1243    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1244        unimplemented!()
1245    }
1246
1247    async fn get_fragment_by_id(
1248        &self,
1249        _fragment_id: FragmentId,
1250    ) -> RpcResult<Option<FragmentDistribution>> {
1251        unimplemented!()
1252    }
1253
1254    async fn get_fragment_vnodes(
1255        &self,
1256        _fragment_id: FragmentId,
1257    ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1258        unimplemented!()
1259    }
1260
1261    async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1262        unimplemented!()
1263    }
1264
1265    fn worker_id(&self) -> WorkerId {
1266        0.into()
1267    }
1268
1269    async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1270        Ok(())
1271    }
1272
1273    async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1274        Ok(1)
1275    }
1276
1277    async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1278        Ok(())
1279    }
1280
1281    async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1282        Ok(RefreshResponse { status: None })
1283    }
1284
1285    fn cluster_id(&self) -> &str {
1286        "test-cluster-uuid"
1287    }
1288
1289    async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1290        unimplemented!()
1291    }
1292}
1293
1294#[cfg(test)]
1295pub static PROTO_FILE_DATA: &str = r#"
1296    syntax = "proto3";
1297    package test;
1298    message TestRecord {
1299      int32 id = 1;
1300      Country country = 3;
1301      int64 zipcode = 4;
1302      float rate = 5;
1303    }
1304    message TestRecordAlterType {
1305        string id = 1;
1306        Country country = 3;
1307        int32 zipcode = 4;
1308        float rate = 5;
1309      }
1310    message TestRecordExt {
1311      int32 id = 1;
1312      Country country = 3;
1313      int64 zipcode = 4;
1314      float rate = 5;
1315      string name = 6;
1316    }
1317    message Country {
1318      string address = 1;
1319      City city = 2;
1320      string zipcode = 3;
1321    }
1322    message City {
1323      string address = 1;
1324      string zipcode = 2;
1325    }"#;
1326
1327/// Returns the file.
1328/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1329pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1330    let in_file = Builder::new()
1331        .prefix("temp")
1332        .suffix(".proto")
1333        .rand_bytes(8)
1334        .tempfile()
1335        .unwrap();
1336
1337    let out_file = Builder::new()
1338        .prefix("temp")
1339        .suffix(".pb")
1340        .rand_bytes(8)
1341        .tempfile()
1342        .unwrap();
1343
1344    let mut file = in_file.as_file();
1345    file.write_all(proto_data.as_ref())
1346        .expect("writing binary to test file");
1347    file.flush().expect("flush temp file failed");
1348    let include_path = in_file
1349        .path()
1350        .parent()
1351        .unwrap()
1352        .to_string_lossy()
1353        .into_owned();
1354    let out_path = out_file.path().to_string_lossy().into_owned();
1355    let in_path = in_file.path().to_string_lossy().into_owned();
1356    let mut compile = std::process::Command::new("protoc");
1357
1358    let out = compile
1359        .arg("--include_imports")
1360        .arg("-I")
1361        .arg(include_path)
1362        .arg(format!("--descriptor_set_out={}", out_path))
1363        .arg(in_path)
1364        .output()
1365        .expect("failed to compile proto");
1366    if !out.status.success() {
1367        panic!("compile proto failed \n output: {:?}", out);
1368    }
1369    out_file
1370}