risingwave_frontend/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29    DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30    FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31    RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::util::cluster_limit::ClusterLimit;
38use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
39use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
40use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
41use risingwave_pb::backup_service::MetaSnapshotMetadata;
42use risingwave_pb::catalog::{
43    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44    PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
49use risingwave_pb::ddl_service::{
50    DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
51    alter_swap_rename_request, create_connection_request,
52};
53use risingwave_pb::hummock::write_limits::WriteLimit;
54use risingwave_pb::hummock::{
55    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
56};
57use risingwave_pb::id::ActorId;
58use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
59use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
60use risingwave_pb::meta::list_actor_states_response::ActorState;
61use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
62use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
63use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
64use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
65use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
66use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
67use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
68use risingwave_pb::meta::{
69    EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
70    RefreshRequest, RefreshResponse, SystemParams,
71};
72use risingwave_pb::secret::PbSecretRef;
73use risingwave_pb::stream_plan::StreamFragmentGraph;
74use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
75use risingwave_pb::user::update_user_request::UpdateField;
76use risingwave_pb::user::{GrantPrivilege, UserInfo};
77use risingwave_rpc_client::error::Result as RpcResult;
78use tempfile::{Builder, NamedTempFile};
79
80use crate::FrontendOpts;
81use crate::catalog::catalog_service::CatalogWriter;
82use crate::catalog::root_catalog::Catalog;
83use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
84use crate::error::{ErrorCode, Result};
85use crate::handler::RwPgResponse;
86use crate::meta_client::FrontendMetaClient;
87use crate::scheduler::HummockSnapshotManagerRef;
88use crate::session::{AuthContext, FrontendEnv, SessionImpl};
89use crate::user::UserId;
90use crate::user::user_manager::UserInfoManager;
91use crate::user::user_service::UserInfoWriter;
92
93/// An embedded frontend without starting meta and without starting frontend as a tcp server.
94pub struct LocalFrontend {
95    pub opts: FrontendOpts,
96    env: FrontendEnv,
97}
98
99impl SessionManager for LocalFrontend {
100    type Session = SessionImpl;
101
102    fn create_dummy_session(
103        &self,
104        _database_id: DatabaseId,
105        _user_name: u32,
106    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
107        unreachable!()
108    }
109
110    fn connect(
111        &self,
112        _database: &str,
113        _user_name: &str,
114        _peer_addr: AddressRef,
115    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
116        Ok(self.session_ref())
117    }
118
119    fn cancel_queries_in_session(&self, _session_id: SessionId) {
120        unreachable!()
121    }
122
123    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
124        unreachable!()
125    }
126
127    fn end_session(&self, _session: &Self::Session) {
128        unreachable!()
129    }
130}
131
132impl LocalFrontend {
133    #[expect(clippy::unused_async)]
134    pub async fn new(opts: FrontendOpts) -> Self {
135        let env = FrontendEnv::mock();
136        Self { opts, env }
137    }
138
139    pub async fn run_sql(
140        &self,
141        sql: impl Into<String>,
142    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
143        let sql: Arc<str> = Arc::from(sql.into());
144        self.session_ref().run_statement(sql, vec![]).await
145    }
146
147    pub async fn run_sql_with_session(
148        &self,
149        session_ref: Arc<SessionImpl>,
150        sql: impl Into<String>,
151    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
152        let sql: Arc<str> = Arc::from(sql.into());
153        session_ref.run_statement(sql, vec![]).await
154    }
155
156    pub async fn run_user_sql(
157        &self,
158        sql: impl Into<String>,
159        database: String,
160        user_name: String,
161        user_id: UserId,
162    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
163        let sql: Arc<str> = Arc::from(sql.into());
164        self.session_user_ref(database, user_name, user_id)
165            .run_statement(sql, vec![])
166            .await
167    }
168
169    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
170        let mut rsp = self.run_sql(sql).await.unwrap();
171        let mut res = vec![];
172        #[for_await]
173        for row_set in rsp.values_stream() {
174            for row in row_set.unwrap() {
175                res.push(format!("{:?}", row));
176            }
177        }
178        res
179    }
180
181    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
182        let mut rsp = self.run_sql(sql).await.unwrap();
183        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
184        let mut res = String::new();
185        #[for_await]
186        for row_set in rsp.values_stream() {
187            for row in row_set.unwrap() {
188                let row: Row = row;
189                let row = row.values()[0].as_ref().unwrap();
190                res += std::str::from_utf8(row).unwrap();
191                res += "\n";
192            }
193        }
194        res
195    }
196
197    /// Creates a new session
198    pub fn session_ref(&self) -> Arc<SessionImpl> {
199        self.session_user_ref(
200            DEFAULT_DATABASE_NAME.to_owned(),
201            DEFAULT_SUPER_USER.to_owned(),
202            DEFAULT_SUPER_USER_ID,
203        )
204    }
205
206    pub fn session_user_ref(
207        &self,
208        database: String,
209        user_name: String,
210        user_id: UserId,
211    ) -> Arc<SessionImpl> {
212        Arc::new(SessionImpl::new(
213            self.env.clone(),
214            AuthContext::new(database, user_name, user_id),
215            UserAuthenticator::None,
216            // Local Frontend use a non-sense id.
217            (0, 0),
218            Address::Tcp(SocketAddr::new(
219                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
220                6666,
221            ))
222            .into(),
223            Default::default(),
224        ))
225    }
226}
227
228pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
229    if rsp.stmt_type() != StatementType::EXPLAIN {
230        panic!("RESPONSE INVALID: {rsp:?}");
231    }
232    let mut res = String::new();
233    #[for_await]
234    for row_set in rsp.values_stream() {
235        for row in row_set.unwrap() {
236            let row: Row = row;
237            let row = row.values()[0].as_ref().unwrap();
238            res += std::str::from_utf8(row).unwrap();
239            res += "\n";
240        }
241    }
242    res
243}
244
245pub struct MockCatalogWriter {
246    catalog: Arc<RwLock<Catalog>>,
247    id: AtomicU32,
248    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
249    schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
250    hummock_snapshot_manager: HummockSnapshotManagerRef,
251}
252
253#[async_trait::async_trait]
254impl CatalogWriter for MockCatalogWriter {
255    async fn create_database(
256        &self,
257        db_name: &str,
258        owner: UserId,
259        resource_group: &str,
260        barrier_interval_ms: Option<u32>,
261        checkpoint_frequency: Option<u64>,
262    ) -> Result<()> {
263        let database_id = DatabaseId::new(self.gen_id());
264        self.catalog.write().create_database(&PbDatabase {
265            name: db_name.to_owned(),
266            id: database_id,
267            owner,
268            resource_group: resource_group.to_owned(),
269            barrier_interval_ms,
270            checkpoint_frequency,
271        });
272        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
273            .await?;
274        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
275            .await?;
276        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
277            .await?;
278        Ok(())
279    }
280
281    async fn create_schema(
282        &self,
283        db_id: DatabaseId,
284        schema_name: &str,
285        owner: UserId,
286    ) -> Result<()> {
287        let id = self.gen_id();
288        self.catalog.write().create_schema(&PbSchema {
289            id,
290            name: schema_name.to_owned(),
291            database_id: db_id,
292            owner,
293        });
294        self.add_schema_id(id, db_id);
295        Ok(())
296    }
297
298    async fn create_materialized_view(
299        &self,
300        mut table: PbTable,
301        _graph: StreamFragmentGraph,
302        _dependencies: HashSet<ObjectId>,
303        _specific_resource_group: Option<String>,
304        _if_not_exists: bool,
305    ) -> Result<()> {
306        table.id = self.gen_id();
307        table.stream_job_status = PbStreamJobStatus::Created as _;
308        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
309        self.catalog.write().create_table(&table);
310        self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
311        self.hummock_snapshot_manager.add_table_for_test(table.id);
312        Ok(())
313    }
314
315    async fn replace_materialized_view(
316        &self,
317        mut table: PbTable,
318        _graph: StreamFragmentGraph,
319    ) -> Result<()> {
320        table.stream_job_status = PbStreamJobStatus::Created as _;
321        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
322        self.catalog.write().update_table(&table);
323        Ok(())
324    }
325
326    async fn create_view(&self, mut view: PbView, _dependencies: HashSet<ObjectId>) -> Result<()> {
327        view.id = self.gen_id();
328        self.catalog.write().create_view(&view);
329        self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
330        Ok(())
331    }
332
333    async fn create_table(
334        &self,
335        source: Option<PbSource>,
336        mut table: PbTable,
337        graph: StreamFragmentGraph,
338        _job_type: PbTableJobType,
339        if_not_exists: bool,
340        _dependencies: HashSet<ObjectId>,
341    ) -> Result<()> {
342        if let Some(source) = source {
343            let source_id = self.create_source_inner(source)?;
344            table.optional_associated_source_id = Some(source_id.into());
345        }
346        self.create_materialized_view(table, graph, HashSet::new(), None, if_not_exists)
347            .await?;
348        Ok(())
349    }
350
351    async fn replace_table(
352        &self,
353        _source: Option<PbSource>,
354        mut table: PbTable,
355        _graph: StreamFragmentGraph,
356        _job_type: TableJobType,
357    ) -> Result<()> {
358        table.stream_job_status = PbStreamJobStatus::Created as _;
359        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
360        self.catalog.write().update_table(&table);
361        Ok(())
362    }
363
364    async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
365        self.catalog.write().update_source(&source);
366        Ok(())
367    }
368
369    async fn create_source(
370        &self,
371        source: PbSource,
372        _graph: Option<StreamFragmentGraph>,
373        _if_not_exists: bool,
374    ) -> Result<()> {
375        self.create_source_inner(source).map(|_| ())
376    }
377
378    async fn create_sink(
379        &self,
380        sink: PbSink,
381        graph: StreamFragmentGraph,
382        _dependencies: HashSet<ObjectId>,
383        _if_not_exists: bool,
384    ) -> Result<()> {
385        self.create_sink_inner(sink, graph)
386    }
387
388    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
389        self.create_subscription_inner(subscription)
390    }
391
392    async fn create_index(
393        &self,
394        mut index: PbIndex,
395        mut index_table: PbTable,
396        _graph: StreamFragmentGraph,
397        _if_not_exists: bool,
398    ) -> Result<()> {
399        index_table.id = self.gen_id();
400        index_table.stream_job_status = PbStreamJobStatus::Created as _;
401        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
402        self.catalog.write().create_table(&index_table);
403        self.add_table_or_index_id(
404            index_table.id.as_raw_id(),
405            index_table.schema_id,
406            index_table.database_id,
407        );
408
409        index.id = index_table.id.as_raw_id().into();
410        index.index_table_id = index_table.id;
411        self.catalog.write().create_index(&index);
412        Ok(())
413    }
414
415    async fn create_function(&self, _function: PbFunction) -> Result<()> {
416        unreachable!()
417    }
418
419    async fn create_connection(
420        &self,
421        _connection_name: String,
422        _database_id: DatabaseId,
423        _schema_id: SchemaId,
424        _owner_id: u32,
425        _connection: create_connection_request::Payload,
426    ) -> Result<()> {
427        unreachable!()
428    }
429
430    async fn create_secret(
431        &self,
432        _secret_name: String,
433        _database_id: DatabaseId,
434        _schema_id: SchemaId,
435        _owner_id: u32,
436        _payload: Vec<u8>,
437    ) -> Result<()> {
438        unreachable!()
439    }
440
441    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
442        unreachable!()
443    }
444
445    async fn drop_table(
446        &self,
447        source_id: Option<u32>,
448        table_id: TableId,
449        cascade: bool,
450    ) -> Result<()> {
451        if cascade {
452            return Err(ErrorCode::NotSupported(
453                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
454                "use drop instead".to_owned(),
455            )
456            .into());
457        }
458        if let Some(source_id) = source_id {
459            self.drop_table_or_source_id(source_id);
460        }
461        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
462        let indexes =
463            self.catalog
464                .read()
465                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
466        for index in indexes {
467            self.drop_index(index.id, cascade).await?;
468        }
469        self.catalog
470            .write()
471            .drop_table(database_id, schema_id, table_id);
472        if let Some(source_id) = source_id {
473            self.catalog
474                .write()
475                .drop_source(database_id, schema_id, source_id.into());
476        }
477        Ok(())
478    }
479
480    async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
481        unreachable!()
482    }
483
484    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
485        if cascade {
486            return Err(ErrorCode::NotSupported(
487                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
488                "use drop instead".to_owned(),
489            )
490            .into());
491        }
492        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
493        let indexes =
494            self.catalog
495                .read()
496                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
497        for index in indexes {
498            self.drop_index(index.id, cascade).await?;
499        }
500        self.catalog
501            .write()
502            .drop_table(database_id, schema_id, table_id);
503        Ok(())
504    }
505
506    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
507        if cascade {
508            return Err(ErrorCode::NotSupported(
509                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
510                "use drop instead".to_owned(),
511            )
512            .into());
513        }
514        let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
515        self.catalog
516            .write()
517            .drop_source(database_id, schema_id, source_id);
518        Ok(())
519    }
520
521    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
522        if cascade {
523            return Err(ErrorCode::NotSupported(
524                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
525                "use drop instead".to_owned(),
526            )
527            .into());
528        }
529        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
530        self.catalog
531            .write()
532            .drop_sink(database_id, schema_id, sink_id);
533        Ok(())
534    }
535
536    async fn drop_subscription(
537        &self,
538        subscription_id: SubscriptionId,
539        cascade: bool,
540    ) -> Result<()> {
541        if cascade {
542            return Err(ErrorCode::NotSupported(
543                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
544                "use drop instead".to_owned(),
545            )
546            .into());
547        }
548        let (database_id, schema_id) =
549            self.drop_table_or_subscription_id(subscription_id.as_raw_id());
550        self.catalog
551            .write()
552            .drop_subscription(database_id, schema_id, subscription_id);
553        Ok(())
554    }
555
556    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
557        if cascade {
558            return Err(ErrorCode::NotSupported(
559                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
560                "use drop instead".to_owned(),
561            )
562            .into());
563        }
564        let &schema_id = self
565            .table_id_to_schema_id
566            .read()
567            .get(&index_id.as_raw_id())
568            .unwrap();
569        let database_id = self.get_database_id_by_schema(schema_id);
570
571        let index = {
572            let catalog_reader = self.catalog.read();
573            let schema_catalog = catalog_reader
574                .get_schema_by_id(database_id, schema_id)
575                .unwrap();
576            schema_catalog.get_index_by_id(index_id).unwrap().clone()
577        };
578
579        let index_table_id = index.index_table().id;
580        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
581        self.catalog
582            .write()
583            .drop_index(database_id, schema_id, index_id);
584        self.catalog
585            .write()
586            .drop_table(database_id, schema_id, index_table_id);
587        Ok(())
588    }
589
590    async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
591        unreachable!()
592    }
593
594    async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
595        unreachable!()
596    }
597
598    async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
599        unreachable!()
600    }
601
602    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
603        self.catalog.write().drop_database(database_id);
604        Ok(())
605    }
606
607    async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
608        let database_id = self.drop_schema_id(schema_id);
609        self.catalog.write().drop_schema(database_id, schema_id);
610        Ok(())
611    }
612
613    async fn alter_name(
614        &self,
615        object_id: alter_name_request::Object,
616        object_name: &str,
617    ) -> Result<()> {
618        match object_id {
619            alter_name_request::Object::TableId(table_id) => {
620                self.catalog
621                    .write()
622                    .alter_table_name_by_id(table_id.into(), object_name);
623                Ok(())
624            }
625            _ => {
626                unimplemented!()
627            }
628        }
629    }
630
631    async fn alter_source(&self, source: PbSource) -> Result<()> {
632        self.catalog.write().update_source(&source);
633        Ok(())
634    }
635
636    async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
637        for database in self.catalog.read().iter_databases() {
638            for schema in database.iter_schemas() {
639                match object {
640                    Object::TableId(table_id) => {
641                        if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
642                        {
643                            let mut pb_table = table.to_prost();
644                            pb_table.owner = owner_id;
645                            self.catalog.write().update_table(&pb_table);
646                            return Ok(());
647                        }
648                    }
649                    _ => unreachable!(),
650                }
651            }
652        }
653
654        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
655    }
656
657    async fn alter_set_schema(
658        &self,
659        object: alter_set_schema_request::Object,
660        new_schema_id: SchemaId,
661    ) -> Result<()> {
662        match object {
663            alter_set_schema_request::Object::TableId(table_id) => {
664                let mut pb_table = {
665                    let reader = self.catalog.read();
666                    let table = reader.get_any_table_by_id(table_id.into())?.to_owned();
667                    table.to_prost()
668                };
669                pb_table.schema_id = new_schema_id;
670                self.catalog.write().update_table(&pb_table);
671                self.table_id_to_schema_id
672                    .write()
673                    .insert(table_id, new_schema_id);
674                Ok(())
675            }
676            _ => unreachable!(),
677        }
678    }
679
680    async fn alter_parallelism(
681        &self,
682        _job_id: JobId,
683        _parallelism: PbTableParallelism,
684        _deferred: bool,
685    ) -> Result<()> {
686        todo!()
687    }
688
689    async fn alter_config(
690        &self,
691        _job_id: JobId,
692        _entries_to_add: HashMap<String, String>,
693        _keys_to_remove: Vec<String>,
694    ) -> Result<()> {
695        todo!()
696    }
697
698    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
699        todo!()
700    }
701
702    async fn alter_secret(
703        &self,
704        _secret_id: SecretId,
705        _secret_name: String,
706        _database_id: DatabaseId,
707        _schema_id: SchemaId,
708        _owner_id: u32,
709        _payload: Vec<u8>,
710    ) -> Result<()> {
711        unreachable!()
712    }
713
714    async fn alter_resource_group(
715        &self,
716        _table_id: TableId,
717        _resource_group: Option<String>,
718        _deferred: bool,
719    ) -> Result<()> {
720        todo!()
721    }
722
723    async fn alter_database_param(
724        &self,
725        database_id: DatabaseId,
726        param: AlterDatabaseParam,
727    ) -> Result<()> {
728        let mut pb_database = {
729            let reader = self.catalog.read();
730            let database = reader.get_database_by_id(database_id)?.to_owned();
731            database.to_prost()
732        };
733        match param {
734            AlterDatabaseParam::BarrierIntervalMs(interval) => {
735                pb_database.barrier_interval_ms = interval;
736            }
737            AlterDatabaseParam::CheckpointFrequency(frequency) => {
738                pb_database.checkpoint_frequency = frequency;
739            }
740        }
741        self.catalog.write().update_database(&pb_database);
742        Ok(())
743    }
744
745    async fn create_iceberg_table(
746        &self,
747        _table_job_info: PbTableJobInfo,
748        _sink_job_info: PbSinkJobInfo,
749        _iceberg_source: PbSource,
750        _if_not_exists: bool,
751    ) -> Result<()> {
752        todo!()
753    }
754}
755
756impl MockCatalogWriter {
757    pub fn new(
758        catalog: Arc<RwLock<Catalog>>,
759        hummock_snapshot_manager: HummockSnapshotManagerRef,
760    ) -> Self {
761        catalog.write().create_database(&PbDatabase {
762            id: 0.into(),
763            name: DEFAULT_DATABASE_NAME.to_owned(),
764            owner: DEFAULT_SUPER_USER_ID,
765            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
766            barrier_interval_ms: None,
767            checkpoint_frequency: None,
768        });
769        catalog.write().create_schema(&PbSchema {
770            id: 1.into(),
771            name: DEFAULT_SCHEMA_NAME.to_owned(),
772            database_id: 0.into(),
773            owner: DEFAULT_SUPER_USER_ID,
774        });
775        catalog.write().create_schema(&PbSchema {
776            id: 2.into(),
777            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
778            database_id: 0.into(),
779            owner: DEFAULT_SUPER_USER_ID,
780        });
781        catalog.write().create_schema(&PbSchema {
782            id: 3.into(),
783            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
784            database_id: 0.into(),
785            owner: DEFAULT_SUPER_USER_ID,
786        });
787        let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
788        map.insert(1_u32.into(), 0_u32.into());
789        map.insert(2_u32.into(), 0_u32.into());
790        map.insert(3_u32.into(), 0_u32.into());
791        Self {
792            catalog,
793            id: AtomicU32::new(3),
794            table_id_to_schema_id: Default::default(),
795            schema_id_to_database_id: RwLock::new(map),
796            hummock_snapshot_manager,
797        }
798    }
799
800    fn gen_id<T: From<u32>>(&self) -> T {
801        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
802        (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
803    }
804
805    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
806        self.table_id_to_schema_id
807            .write()
808            .insert(table_id, schema_id);
809    }
810
811    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
812        let schema_id = self
813            .table_id_to_schema_id
814            .write()
815            .remove(&table_id)
816            .unwrap();
817        (self.get_database_id_by_schema(schema_id), schema_id)
818    }
819
820    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
821        self.table_id_to_schema_id
822            .write()
823            .insert(table_id, schema_id);
824    }
825
826    fn add_table_or_subscription_id(
827        &self,
828        table_id: u32,
829        schema_id: SchemaId,
830        _database_id: DatabaseId,
831    ) {
832        self.table_id_to_schema_id
833            .write()
834            .insert(table_id, schema_id);
835    }
836
837    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
838        self.table_id_to_schema_id
839            .write()
840            .insert(table_id, schema_id);
841    }
842
843    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
844        let schema_id = self
845            .table_id_to_schema_id
846            .write()
847            .remove(&table_id)
848            .unwrap();
849        (self.get_database_id_by_schema(schema_id), schema_id)
850    }
851
852    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
853        let schema_id = self
854            .table_id_to_schema_id
855            .write()
856            .remove(&table_id)
857            .unwrap();
858        (self.get_database_id_by_schema(schema_id), schema_id)
859    }
860
861    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
862        let schema_id = self
863            .table_id_to_schema_id
864            .write()
865            .remove(&table_id)
866            .unwrap();
867        (self.get_database_id_by_schema(schema_id), schema_id)
868    }
869
870    fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
871        self.schema_id_to_database_id
872            .write()
873            .insert(schema_id, database_id);
874    }
875
876    fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
877        self.schema_id_to_database_id
878            .write()
879            .remove(&schema_id)
880            .unwrap()
881    }
882
883    fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
884        source.id = self.gen_id();
885        self.catalog.write().create_source(&source);
886        self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
887        Ok(source.id)
888    }
889
890    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
891        sink.id = self.gen_id();
892        sink.stream_job_status = PbStreamJobStatus::Created as _;
893        self.catalog.write().create_sink(&sink);
894        self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
895        Ok(())
896    }
897
898    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
899        subscription.id = self.gen_id();
900        self.catalog.write().create_subscription(&subscription);
901        self.add_table_or_subscription_id(
902            subscription.id.as_raw_id(),
903            subscription.schema_id,
904            subscription.database_id,
905        );
906        Ok(())
907    }
908
909    fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
910        *self
911            .schema_id_to_database_id
912            .read()
913            .get(&schema_id)
914            .unwrap()
915    }
916}
917
918pub struct MockUserInfoWriter {
919    id: AtomicU32,
920    user_info: Arc<RwLock<UserInfoManager>>,
921}
922
923#[async_trait::async_trait]
924impl UserInfoWriter for MockUserInfoWriter {
925    async fn create_user(&self, user: UserInfo) -> Result<()> {
926        let mut user = user;
927        user.id = self.gen_id();
928        self.user_info.write().create_user(user);
929        Ok(())
930    }
931
932    async fn drop_user(&self, id: UserId) -> Result<()> {
933        self.user_info.write().drop_user(id);
934        Ok(())
935    }
936
937    async fn update_user(
938        &self,
939        update_user: UserInfo,
940        update_fields: Vec<UpdateField>,
941    ) -> Result<()> {
942        let mut lock = self.user_info.write();
943        let id = update_user.get_id();
944        let Some(old_name) = lock.get_user_name_by_id(id) else {
945            return Ok(());
946        };
947        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
948        update_fields.into_iter().for_each(|field| match field {
949            UpdateField::Super => user_info.is_super = update_user.is_super,
950            UpdateField::Login => user_info.can_login = update_user.can_login,
951            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
952            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
953            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
954            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
955            UpdateField::Admin => user_info.is_admin = update_user.is_admin,
956            UpdateField::Unspecified => unreachable!(),
957        });
958        lock.update_user(update_user);
959        Ok(())
960    }
961
962    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
963    /// `GrantAllSources` when grant privilege to user.
964    async fn grant_privilege(
965        &self,
966        users: Vec<UserId>,
967        privileges: Vec<GrantPrivilege>,
968        with_grant_option: bool,
969        _grantor: UserId,
970    ) -> Result<()> {
971        let privileges = privileges
972            .into_iter()
973            .map(|mut p| {
974                p.action_with_opts
975                    .iter_mut()
976                    .for_each(|ao| ao.with_grant_option = with_grant_option);
977                p
978            })
979            .collect::<Vec<_>>();
980        for user_id in users {
981            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
982                u.extend_privileges(privileges.clone());
983            }
984        }
985        Ok(())
986    }
987
988    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
989    /// `RevokeAllSources` when revoke privilege from user.
990    async fn revoke_privilege(
991        &self,
992        users: Vec<UserId>,
993        privileges: Vec<GrantPrivilege>,
994        _granted_by: UserId,
995        _revoke_by: UserId,
996        revoke_grant_option: bool,
997        _cascade: bool,
998    ) -> Result<()> {
999        for user_id in users {
1000            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1001                u.revoke_privileges(privileges.clone(), revoke_grant_option);
1002            }
1003        }
1004        Ok(())
1005    }
1006
1007    async fn alter_default_privilege(
1008        &self,
1009        _users: Vec<UserId>,
1010        _database_id: DatabaseId,
1011        _schemas: Vec<SchemaId>,
1012        _operation: AlterDefaultPrivilegeOperation,
1013        _operated_by: UserId,
1014    ) -> Result<()> {
1015        todo!()
1016    }
1017}
1018
1019impl MockUserInfoWriter {
1020    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1021        user_info.write().create_user(UserInfo {
1022            id: DEFAULT_SUPER_USER_ID,
1023            name: DEFAULT_SUPER_USER.to_owned(),
1024            is_super: true,
1025            can_create_db: true,
1026            can_create_user: true,
1027            can_login: true,
1028            ..Default::default()
1029        });
1030        user_info.write().create_user(UserInfo {
1031            id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1032            name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1033            is_super: true,
1034            can_create_db: true,
1035            can_create_user: true,
1036            can_login: true,
1037            is_admin: true,
1038            ..Default::default()
1039        });
1040        Self {
1041            user_info,
1042            id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
1043        }
1044    }
1045
1046    fn gen_id(&self) -> u32 {
1047        self.id.fetch_add(1, Ordering::SeqCst)
1048    }
1049}
1050
1051pub struct MockFrontendMetaClient {}
1052
1053#[async_trait::async_trait]
1054impl FrontendMetaClient for MockFrontendMetaClient {
1055    async fn try_unregister(&self) {}
1056
1057    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1058        Ok(INVALID_VERSION_ID)
1059    }
1060
1061    async fn wait(&self) -> RpcResult<()> {
1062        Ok(())
1063    }
1064
1065    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1066        Ok(vec![])
1067    }
1068
1069    async fn list_table_fragments(
1070        &self,
1071        _table_ids: &[JobId],
1072    ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1073        Ok(HashMap::default())
1074    }
1075
1076    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1077        Ok(vec![])
1078    }
1079
1080    async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1081        Ok(vec![])
1082    }
1083
1084    async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1085        Ok(vec![])
1086    }
1087
1088    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1089        Ok(vec![])
1090    }
1091
1092    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1093        Ok(vec![])
1094    }
1095
1096    async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1097        Ok(vec![])
1098    }
1099
1100    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1101        Ok(vec![])
1102    }
1103
1104    async fn set_system_param(
1105        &self,
1106        _param: String,
1107        _value: Option<String>,
1108    ) -> RpcResult<Option<SystemParamsReader>> {
1109        Ok(Some(SystemParams::default().into()))
1110    }
1111
1112    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1113        Ok(Default::default())
1114    }
1115
1116    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1117        Ok("".to_owned())
1118    }
1119
1120    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1121        Ok(vec![])
1122    }
1123
1124    async fn get_tables(
1125        &self,
1126        _table_ids: Vec<crate::catalog::TableId>,
1127        _include_dropped_tables: bool,
1128    ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1129        Ok(HashMap::new())
1130    }
1131
1132    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, u64)>> {
1133        unimplemented!()
1134    }
1135
1136    async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1137        unimplemented!()
1138    }
1139
1140    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1141        Ok(HummockVersion::default())
1142    }
1143
1144    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1145        unimplemented!()
1146    }
1147
1148    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1149        unimplemented!()
1150    }
1151
1152    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1153        unimplemented!()
1154    }
1155
1156    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1157        unimplemented!()
1158    }
1159
1160    async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1161        unimplemented!()
1162    }
1163
1164    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1165        unimplemented!()
1166    }
1167
1168    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1169        Ok(vec![])
1170    }
1171
1172    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1173        unimplemented!()
1174    }
1175
1176    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1177        Ok(vec![])
1178    }
1179
1180    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1181        unimplemented!()
1182    }
1183
1184    async fn recover(&self) -> RpcResult<()> {
1185        unimplemented!()
1186    }
1187
1188    async fn apply_throttle(
1189        &self,
1190        _kind: PbThrottleTarget,
1191        _id: u32,
1192        _rate_limit: Option<u32>,
1193    ) -> RpcResult<()> {
1194        unimplemented!()
1195    }
1196
1197    async fn alter_fragment_parallelism(
1198        &self,
1199        _fragment_ids: Vec<FragmentId>,
1200        _parallelism: Option<PbTableParallelism>,
1201    ) -> RpcResult<()> {
1202        unimplemented!()
1203    }
1204
1205    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1206        Ok(RecoveryStatus::StatusRunning)
1207    }
1208
1209    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1210        Ok(vec![])
1211    }
1212
1213    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1214        Ok(vec![])
1215    }
1216
1217    async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1218        Ok(HashMap::default())
1219    }
1220
1221    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1222        unimplemented!()
1223    }
1224
1225    async fn alter_sink_props(
1226        &self,
1227        _sink_id: SinkId,
1228        _changed_props: BTreeMap<String, String>,
1229        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1230        _connector_conn_ref: Option<ConnectionId>,
1231    ) -> RpcResult<()> {
1232        unimplemented!()
1233    }
1234
1235    async fn alter_iceberg_table_props(
1236        &self,
1237        _table_id: TableId,
1238        _sink_id: SinkId,
1239        _source_id: SourceId,
1240        _changed_props: BTreeMap<String, String>,
1241        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1242        _connector_conn_ref: Option<ConnectionId>,
1243    ) -> RpcResult<()> {
1244        unimplemented!()
1245    }
1246
1247    async fn alter_source_connector_props(
1248        &self,
1249        _source_id: SourceId,
1250        _changed_props: BTreeMap<String, String>,
1251        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1252        _connector_conn_ref: Option<ConnectionId>,
1253    ) -> RpcResult<()> {
1254        unimplemented!()
1255    }
1256
1257    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1258        unimplemented!()
1259    }
1260
1261    async fn get_fragment_by_id(
1262        &self,
1263        _fragment_id: FragmentId,
1264    ) -> RpcResult<Option<FragmentDistribution>> {
1265        unimplemented!()
1266    }
1267
1268    async fn get_fragment_vnodes(
1269        &self,
1270        _fragment_id: FragmentId,
1271    ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1272        unimplemented!()
1273    }
1274
1275    async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1276        unimplemented!()
1277    }
1278
1279    fn worker_id(&self) -> WorkerId {
1280        0.into()
1281    }
1282
1283    async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1284        Ok(())
1285    }
1286
1287    async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1288        Ok(1)
1289    }
1290
1291    async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1292        Ok(())
1293    }
1294
1295    async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1296        Ok(RefreshResponse { status: None })
1297    }
1298
1299    fn cluster_id(&self) -> &str {
1300        "test-cluster-uuid"
1301    }
1302
1303    async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1304        unimplemented!()
1305    }
1306}
1307
1308#[cfg(test)]
1309pub static PROTO_FILE_DATA: &str = r#"
1310    syntax = "proto3";
1311    package test;
1312    message TestRecord {
1313      int32 id = 1;
1314      Country country = 3;
1315      int64 zipcode = 4;
1316      float rate = 5;
1317    }
1318    message TestRecordAlterType {
1319        string id = 1;
1320        Country country = 3;
1321        int32 zipcode = 4;
1322        float rate = 5;
1323      }
1324    message TestRecordExt {
1325      int32 id = 1;
1326      Country country = 3;
1327      int64 zipcode = 4;
1328      float rate = 5;
1329      string name = 6;
1330    }
1331    message Country {
1332      string address = 1;
1333      City city = 2;
1334      string zipcode = 3;
1335    }
1336    message City {
1337      string address = 1;
1338      string zipcode = 2;
1339    }"#;
1340
1341/// Returns the file.
1342/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1343pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1344    let in_file = Builder::new()
1345        .prefix("temp")
1346        .suffix(".proto")
1347        .rand_bytes(8)
1348        .tempfile()
1349        .unwrap();
1350
1351    let out_file = Builder::new()
1352        .prefix("temp")
1353        .suffix(".pb")
1354        .rand_bytes(8)
1355        .tempfile()
1356        .unwrap();
1357
1358    let mut file = in_file.as_file();
1359    file.write_all(proto_data.as_ref())
1360        .expect("writing binary to test file");
1361    file.flush().expect("flush temp file failed");
1362    let include_path = in_file
1363        .path()
1364        .parent()
1365        .unwrap()
1366        .to_string_lossy()
1367        .into_owned();
1368    let out_path = out_file.path().to_string_lossy().into_owned();
1369    let in_path = in_file.path().to_string_lossy().into_owned();
1370    let mut compile = std::process::Command::new("protoc");
1371
1372    let out = compile
1373        .arg("--include_imports")
1374        .arg("-I")
1375        .arg(include_path)
1376        .arg(format!("--descriptor_set_out={}", out_path))
1377        .arg(in_path)
1378        .output()
1379        .expect("failed to compile proto");
1380    if !out.status.success() {
1381        panic!("compile proto failed \n output: {:?}", out);
1382    }
1383    out_file
1384}