risingwave_frontend/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29    DEFAULT_SUPER_USER_ID, FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId,
30    PG_CATALOG_SCHEMA_NAME, RW_CATALOG_SCHEMA_NAME, TableId,
31};
32use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
33use risingwave_common::session_config::SessionConfig;
34use risingwave_common::system_param::reader::SystemParamsReader;
35use risingwave_common::util::cluster_limit::ClusterLimit;
36use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
37use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
38use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
39use risingwave_pb::backup_service::MetaSnapshotMetadata;
40use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
41use risingwave_pb::catalog::{
42    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
43    PbSubscription, PbTable, PbView, Table,
44};
45use risingwave_pb::common::WorkerNode;
46use risingwave_pb::ddl_service::alter_owner_request::Object;
47use risingwave_pb::ddl_service::{
48    DdlProgress, PbTableJobType, ReplaceJobPlan, TableJobType, alter_name_request,
49    alter_set_schema_request, alter_swap_rename_request, create_connection_request,
50};
51use risingwave_pb::hummock::write_limits::WriteLimit;
52use risingwave_pb::hummock::{
53    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
54};
55use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
56use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
57use risingwave_pb::meta::list_actor_states_response::ActorState;
58use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
59use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
60use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
61use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
62use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
63use risingwave_pb::meta::{
64    EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
65    RefreshRequest, RefreshResponse, SystemParams,
66};
67use risingwave_pb::secret::PbSecretRef;
68use risingwave_pb::stream_plan::StreamFragmentGraph;
69use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
70use risingwave_pb::user::update_user_request::UpdateField;
71use risingwave_pb::user::{GrantPrivilege, UserInfo};
72use risingwave_rpc_client::error::Result as RpcResult;
73use tempfile::{Builder, NamedTempFile};
74
75use crate::FrontendOpts;
76use crate::catalog::catalog_service::CatalogWriter;
77use crate::catalog::root_catalog::Catalog;
78use crate::catalog::{DatabaseId, SchemaId, SecretId, SinkId};
79use crate::error::{ErrorCode, Result};
80use crate::handler::RwPgResponse;
81use crate::meta_client::FrontendMetaClient;
82use crate::scheduler::HummockSnapshotManagerRef;
83use crate::session::{AuthContext, FrontendEnv, SessionImpl};
84use crate::user::UserId;
85use crate::user::user_manager::UserInfoManager;
86use crate::user::user_service::UserInfoWriter;
87
88/// An embedded frontend without starting meta and without starting frontend as a tcp server.
89pub struct LocalFrontend {
90    pub opts: FrontendOpts,
91    env: FrontendEnv,
92}
93
94impl SessionManager for LocalFrontend {
95    type Session = SessionImpl;
96
97    fn create_dummy_session(
98        &self,
99        _database_id: u32,
100        _user_name: u32,
101    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
102        unreachable!()
103    }
104
105    fn connect(
106        &self,
107        _database: &str,
108        _user_name: &str,
109        _peer_addr: AddressRef,
110    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
111        Ok(self.session_ref())
112    }
113
114    fn cancel_queries_in_session(&self, _session_id: SessionId) {
115        unreachable!()
116    }
117
118    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
119        unreachable!()
120    }
121
122    fn end_session(&self, _session: &Self::Session) {
123        unreachable!()
124    }
125}
126
127impl LocalFrontend {
128    #[expect(clippy::unused_async)]
129    pub async fn new(opts: FrontendOpts) -> Self {
130        let env = FrontendEnv::mock();
131        Self { opts, env }
132    }
133
134    pub async fn run_sql(
135        &self,
136        sql: impl Into<String>,
137    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
138        let sql: Arc<str> = Arc::from(sql.into());
139        self.session_ref().run_statement(sql, vec![]).await
140    }
141
142    pub async fn run_sql_with_session(
143        &self,
144        session_ref: Arc<SessionImpl>,
145        sql: impl Into<String>,
146    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
147        let sql: Arc<str> = Arc::from(sql.into());
148        session_ref.run_statement(sql, vec![]).await
149    }
150
151    pub async fn run_user_sql(
152        &self,
153        sql: impl Into<String>,
154        database: String,
155        user_name: String,
156        user_id: UserId,
157    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
158        let sql: Arc<str> = Arc::from(sql.into());
159        self.session_user_ref(database, user_name, user_id)
160            .run_statement(sql, vec![])
161            .await
162    }
163
164    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
165        let mut rsp = self.run_sql(sql).await.unwrap();
166        let mut res = vec![];
167        #[for_await]
168        for row_set in rsp.values_stream() {
169            for row in row_set.unwrap() {
170                res.push(format!("{:?}", row));
171            }
172        }
173        res
174    }
175
176    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
177        let mut rsp = self.run_sql(sql).await.unwrap();
178        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
179        let mut res = String::new();
180        #[for_await]
181        for row_set in rsp.values_stream() {
182            for row in row_set.unwrap() {
183                let row: Row = row;
184                let row = row.values()[0].as_ref().unwrap();
185                res += std::str::from_utf8(row).unwrap();
186                res += "\n";
187            }
188        }
189        res
190    }
191
192    /// Creates a new session
193    pub fn session_ref(&self) -> Arc<SessionImpl> {
194        self.session_user_ref(
195            DEFAULT_DATABASE_NAME.to_owned(),
196            DEFAULT_SUPER_USER.to_owned(),
197            DEFAULT_SUPER_USER_ID,
198        )
199    }
200
201    pub fn session_user_ref(
202        &self,
203        database: String,
204        user_name: String,
205        user_id: UserId,
206    ) -> Arc<SessionImpl> {
207        Arc::new(SessionImpl::new(
208            self.env.clone(),
209            AuthContext::new(database, user_name, user_id),
210            UserAuthenticator::None,
211            // Local Frontend use a non-sense id.
212            (0, 0),
213            Address::Tcp(SocketAddr::new(
214                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
215                6666,
216            ))
217            .into(),
218            Default::default(),
219        ))
220    }
221}
222
223pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
224    if rsp.stmt_type() != StatementType::EXPLAIN {
225        panic!("RESPONSE INVALID: {rsp:?}");
226    }
227    let mut res = String::new();
228    #[for_await]
229    for row_set in rsp.values_stream() {
230        for row in row_set.unwrap() {
231            let row: Row = row;
232            let row = row.values()[0].as_ref().unwrap();
233            res += std::str::from_utf8(row).unwrap();
234            res += "\n";
235        }
236    }
237    res
238}
239
240pub struct MockCatalogWriter {
241    catalog: Arc<RwLock<Catalog>>,
242    id: AtomicU32,
243    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
244    schema_id_to_database_id: RwLock<HashMap<u32, DatabaseId>>,
245    hummock_snapshot_manager: HummockSnapshotManagerRef,
246}
247
248#[async_trait::async_trait]
249impl CatalogWriter for MockCatalogWriter {
250    async fn create_database(
251        &self,
252        db_name: &str,
253        owner: UserId,
254        resource_group: &str,
255        barrier_interval_ms: Option<u32>,
256        checkpoint_frequency: Option<u64>,
257    ) -> Result<()> {
258        let database_id = self.gen_id();
259        self.catalog.write().create_database(&PbDatabase {
260            name: db_name.to_owned(),
261            id: database_id,
262            owner,
263            resource_group: resource_group.to_owned(),
264            barrier_interval_ms,
265            checkpoint_frequency,
266        });
267        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
268            .await?;
269        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
270            .await?;
271        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
272            .await?;
273        Ok(())
274    }
275
276    async fn create_schema(
277        &self,
278        db_id: DatabaseId,
279        schema_name: &str,
280        owner: UserId,
281    ) -> Result<()> {
282        let id = self.gen_id();
283        self.catalog.write().create_schema(&PbSchema {
284            id,
285            name: schema_name.to_owned(),
286            database_id: db_id,
287            owner,
288        });
289        self.add_schema_id(id, db_id);
290        Ok(())
291    }
292
293    async fn create_materialized_view(
294        &self,
295        mut table: PbTable,
296        _graph: StreamFragmentGraph,
297        _dependencies: HashSet<ObjectId>,
298        _specific_resource_group: Option<String>,
299        _if_not_exists: bool,
300    ) -> Result<()> {
301        table.id = self.gen_id();
302        table.stream_job_status = PbStreamJobStatus::Created as _;
303        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
304        self.catalog.write().create_table(&table);
305        self.add_table_or_source_id(table.id, table.schema_id, table.database_id);
306        self.hummock_snapshot_manager
307            .add_table_for_test(TableId::new(table.id));
308        Ok(())
309    }
310
311    async fn replace_materialized_view(
312        &self,
313        mut table: PbTable,
314        _graph: StreamFragmentGraph,
315    ) -> Result<()> {
316        table.stream_job_status = PbStreamJobStatus::Created as _;
317        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
318        self.catalog.write().update_table(&table);
319        Ok(())
320    }
321
322    async fn create_view(&self, mut view: PbView, _dependencies: HashSet<ObjectId>) -> Result<()> {
323        view.id = self.gen_id();
324        self.catalog.write().create_view(&view);
325        self.add_table_or_source_id(view.id, view.schema_id, view.database_id);
326        Ok(())
327    }
328
329    async fn create_table(
330        &self,
331        source: Option<PbSource>,
332        mut table: PbTable,
333        graph: StreamFragmentGraph,
334        _job_type: PbTableJobType,
335        if_not_exists: bool,
336        _dependencies: HashSet<ObjectId>,
337    ) -> Result<()> {
338        if let Some(source) = source {
339            let source_id = self.create_source_inner(source)?;
340            table.optional_associated_source_id =
341                Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
342        }
343        self.create_materialized_view(table, graph, HashSet::new(), None, if_not_exists)
344            .await?;
345        Ok(())
346    }
347
348    async fn replace_table(
349        &self,
350        _source: Option<PbSource>,
351        mut table: PbTable,
352        _graph: StreamFragmentGraph,
353        _job_type: TableJobType,
354    ) -> Result<()> {
355        table.stream_job_status = PbStreamJobStatus::Created as _;
356        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
357        self.catalog.write().update_table(&table);
358        Ok(())
359    }
360
361    async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
362        self.catalog.write().update_source(&source);
363        Ok(())
364    }
365
366    async fn create_source(
367        &self,
368        source: PbSource,
369        _graph: Option<StreamFragmentGraph>,
370        _if_not_exists: bool,
371    ) -> Result<()> {
372        self.create_source_inner(source).map(|_| ())
373    }
374
375    async fn create_sink(
376        &self,
377        sink: PbSink,
378        graph: StreamFragmentGraph,
379        _affected_table_change: Option<ReplaceJobPlan>,
380        _dependencies: HashSet<ObjectId>,
381        _if_not_exists: bool,
382    ) -> Result<()> {
383        self.create_sink_inner(sink, graph)
384    }
385
386    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
387        self.create_subscription_inner(subscription)
388    }
389
390    async fn create_index(
391        &self,
392        mut index: PbIndex,
393        mut index_table: PbTable,
394        _graph: StreamFragmentGraph,
395        _if_not_exists: bool,
396    ) -> Result<()> {
397        index_table.id = self.gen_id();
398        index_table.stream_job_status = PbStreamJobStatus::Created as _;
399        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
400        self.catalog.write().create_table(&index_table);
401        self.add_table_or_index_id(
402            index_table.id,
403            index_table.schema_id,
404            index_table.database_id,
405        );
406
407        index.id = index_table.id;
408        index.index_table_id = index_table.id;
409        self.catalog.write().create_index(&index);
410        Ok(())
411    }
412
413    async fn create_function(&self, _function: PbFunction) -> Result<()> {
414        unreachable!()
415    }
416
417    async fn create_connection(
418        &self,
419        _connection_name: String,
420        _database_id: u32,
421        _schema_id: u32,
422        _owner_id: u32,
423        _connection: create_connection_request::Payload,
424    ) -> Result<()> {
425        unreachable!()
426    }
427
428    async fn create_secret(
429        &self,
430        _secret_name: String,
431        _database_id: u32,
432        _schema_id: u32,
433        _owner_id: u32,
434        _payload: Vec<u8>,
435    ) -> Result<()> {
436        unreachable!()
437    }
438
439    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
440        unreachable!()
441    }
442
443    async fn drop_table(
444        &self,
445        source_id: Option<u32>,
446        table_id: TableId,
447        cascade: bool,
448    ) -> Result<()> {
449        if cascade {
450            return Err(ErrorCode::NotSupported(
451                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
452                "use drop instead".to_owned(),
453            )
454            .into());
455        }
456        if let Some(source_id) = source_id {
457            self.drop_table_or_source_id(source_id);
458        }
459        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
460        let indexes =
461            self.catalog
462                .read()
463                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
464        for index in indexes {
465            self.drop_index(index.id, cascade).await?;
466        }
467        self.catalog
468            .write()
469            .drop_table(database_id, schema_id, table_id);
470        if let Some(source_id) = source_id {
471            self.catalog
472                .write()
473                .drop_source(database_id, schema_id, source_id);
474        }
475        Ok(())
476    }
477
478    async fn drop_view(&self, _view_id: u32, _cascade: bool) -> Result<()> {
479        unreachable!()
480    }
481
482    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
483        if cascade {
484            return Err(ErrorCode::NotSupported(
485                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
486                "use drop instead".to_owned(),
487            )
488            .into());
489        }
490        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
491        let indexes =
492            self.catalog
493                .read()
494                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
495        for index in indexes {
496            self.drop_index(index.id, cascade).await?;
497        }
498        self.catalog
499            .write()
500            .drop_table(database_id, schema_id, table_id);
501        Ok(())
502    }
503
504    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
505        if cascade {
506            return Err(ErrorCode::NotSupported(
507                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
508                "use drop instead".to_owned(),
509            )
510            .into());
511        }
512        let (database_id, schema_id) = self.drop_table_or_source_id(source_id);
513        self.catalog
514            .write()
515            .drop_source(database_id, schema_id, source_id);
516        Ok(())
517    }
518
519    async fn drop_sink(
520        &self,
521        sink_id: u32,
522        cascade: bool,
523        _target_table_change: Option<ReplaceJobPlan>,
524    ) -> Result<()> {
525        if cascade {
526            return Err(ErrorCode::NotSupported(
527                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
528                "use drop instead".to_owned(),
529            )
530            .into());
531        }
532        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id);
533        self.catalog
534            .write()
535            .drop_sink(database_id, schema_id, sink_id);
536        Ok(())
537    }
538
539    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
540        if cascade {
541            return Err(ErrorCode::NotSupported(
542                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
543                "use drop instead".to_owned(),
544            )
545            .into());
546        }
547        let (database_id, schema_id) = self.drop_table_or_subscription_id(subscription_id);
548        self.catalog
549            .write()
550            .drop_subscription(database_id, schema_id, subscription_id);
551        Ok(())
552    }
553
554    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
555        if cascade {
556            return Err(ErrorCode::NotSupported(
557                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
558                "use drop instead".to_owned(),
559            )
560            .into());
561        }
562        let &schema_id = self
563            .table_id_to_schema_id
564            .read()
565            .get(&index_id.index_id)
566            .unwrap();
567        let database_id = self.get_database_id_by_schema(schema_id);
568
569        let index = {
570            let catalog_reader = self.catalog.read();
571            let schema_catalog = catalog_reader
572                .get_schema_by_id(&database_id, &schema_id)
573                .unwrap();
574            schema_catalog.get_index_by_id(&index_id).unwrap().clone()
575        };
576
577        let index_table_id = index.index_table().id;
578        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.index_id);
579        self.catalog
580            .write()
581            .drop_index(database_id, schema_id, index_id);
582        self.catalog
583            .write()
584            .drop_table(database_id, schema_id, index_table_id);
585        Ok(())
586    }
587
588    async fn drop_function(&self, _function_id: FunctionId) -> Result<()> {
589        unreachable!()
590    }
591
592    async fn drop_connection(&self, _connection_id: u32, _cascade: bool) -> Result<()> {
593        unreachable!()
594    }
595
596    async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
597        unreachable!()
598    }
599
600    async fn drop_database(&self, database_id: u32) -> Result<()> {
601        self.catalog.write().drop_database(database_id);
602        Ok(())
603    }
604
605    async fn drop_schema(&self, schema_id: u32, _cascade: bool) -> Result<()> {
606        let database_id = self.drop_schema_id(schema_id);
607        self.catalog.write().drop_schema(database_id, schema_id);
608        Ok(())
609    }
610
611    async fn alter_name(
612        &self,
613        object_id: alter_name_request::Object,
614        object_name: &str,
615    ) -> Result<()> {
616        match object_id {
617            alter_name_request::Object::TableId(table_id) => {
618                self.catalog
619                    .write()
620                    .alter_table_name_by_id(&table_id.into(), object_name);
621                Ok(())
622            }
623            _ => {
624                unimplemented!()
625            }
626        }
627    }
628
629    async fn alter_source(&self, source: PbSource) -> Result<()> {
630        self.catalog.write().update_source(&source);
631        Ok(())
632    }
633
634    async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
635        for database in self.catalog.read().iter_databases() {
636            for schema in database.iter_schemas() {
637                match object {
638                    Object::TableId(table_id) => {
639                        if let Some(table) =
640                            schema.get_created_table_by_id(&TableId::from(table_id))
641                        {
642                            let mut pb_table = table.to_prost();
643                            pb_table.owner = owner_id;
644                            self.catalog.write().update_table(&pb_table);
645                            return Ok(());
646                        }
647                    }
648                    _ => unreachable!(),
649                }
650            }
651        }
652
653        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
654    }
655
656    async fn alter_set_schema(
657        &self,
658        object: alter_set_schema_request::Object,
659        new_schema_id: u32,
660    ) -> Result<()> {
661        match object {
662            alter_set_schema_request::Object::TableId(table_id) => {
663                let mut pb_table = {
664                    let reader = self.catalog.read();
665                    let table = reader.get_any_table_by_id(&table_id.into())?.to_owned();
666                    table.to_prost()
667                };
668                pb_table.schema_id = new_schema_id;
669                self.catalog.write().update_table(&pb_table);
670                self.table_id_to_schema_id
671                    .write()
672                    .insert(table_id, new_schema_id);
673                Ok(())
674            }
675            _ => unreachable!(),
676        }
677    }
678
679    async fn alter_parallelism(
680        &self,
681        _table_id: u32,
682        _parallelism: PbTableParallelism,
683        _deferred: bool,
684    ) -> Result<()> {
685        todo!()
686    }
687
688    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
689        todo!()
690    }
691
692    async fn alter_secret(
693        &self,
694        _secret_id: u32,
695        _secret_name: String,
696        _database_id: u32,
697        _schema_id: u32,
698        _owner_id: u32,
699        _payload: Vec<u8>,
700    ) -> Result<()> {
701        unreachable!()
702    }
703
704    async fn alter_resource_group(
705        &self,
706        _table_id: u32,
707        _resource_group: Option<String>,
708        _deferred: bool,
709    ) -> Result<()> {
710        todo!()
711    }
712
713    async fn alter_database_param(
714        &self,
715        database_id: u32,
716        param: AlterDatabaseParam,
717    ) -> Result<()> {
718        let mut pb_database = {
719            let reader = self.catalog.read();
720            let database = reader.get_database_by_id(&database_id)?.to_owned();
721            database.to_prost()
722        };
723        match param {
724            AlterDatabaseParam::BarrierIntervalMs(interval) => {
725                pb_database.barrier_interval_ms = interval;
726            }
727            AlterDatabaseParam::CheckpointFrequency(frequency) => {
728                pb_database.checkpoint_frequency = frequency;
729            }
730        }
731        self.catalog.write().update_database(&pb_database);
732        Ok(())
733    }
734}
735
736impl MockCatalogWriter {
737    pub fn new(
738        catalog: Arc<RwLock<Catalog>>,
739        hummock_snapshot_manager: HummockSnapshotManagerRef,
740    ) -> Self {
741        catalog.write().create_database(&PbDatabase {
742            id: 0,
743            name: DEFAULT_DATABASE_NAME.to_owned(),
744            owner: DEFAULT_SUPER_USER_ID,
745            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
746            barrier_interval_ms: None,
747            checkpoint_frequency: None,
748        });
749        catalog.write().create_schema(&PbSchema {
750            id: 1,
751            name: DEFAULT_SCHEMA_NAME.to_owned(),
752            database_id: 0,
753            owner: DEFAULT_SUPER_USER_ID,
754        });
755        catalog.write().create_schema(&PbSchema {
756            id: 2,
757            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
758            database_id: 0,
759            owner: DEFAULT_SUPER_USER_ID,
760        });
761        catalog.write().create_schema(&PbSchema {
762            id: 3,
763            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
764            database_id: 0,
765            owner: DEFAULT_SUPER_USER_ID,
766        });
767        let mut map: HashMap<u32, DatabaseId> = HashMap::new();
768        map.insert(1_u32, 0_u32);
769        map.insert(2_u32, 0_u32);
770        map.insert(3_u32, 0_u32);
771        Self {
772            catalog,
773            id: AtomicU32::new(3),
774            table_id_to_schema_id: Default::default(),
775            schema_id_to_database_id: RwLock::new(map),
776            hummock_snapshot_manager,
777        }
778    }
779
780    fn gen_id(&self) -> u32 {
781        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
782        self.id.fetch_add(1, Ordering::SeqCst) + 1
783    }
784
785    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
786        self.table_id_to_schema_id
787            .write()
788            .insert(table_id, schema_id);
789    }
790
791    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
792        let schema_id = self
793            .table_id_to_schema_id
794            .write()
795            .remove(&table_id)
796            .unwrap();
797        (self.get_database_id_by_schema(schema_id), schema_id)
798    }
799
800    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
801        self.table_id_to_schema_id
802            .write()
803            .insert(table_id, schema_id);
804    }
805
806    fn add_table_or_subscription_id(
807        &self,
808        table_id: u32,
809        schema_id: SchemaId,
810        _database_id: DatabaseId,
811    ) {
812        self.table_id_to_schema_id
813            .write()
814            .insert(table_id, schema_id);
815    }
816
817    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
818        self.table_id_to_schema_id
819            .write()
820            .insert(table_id, schema_id);
821    }
822
823    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
824        let schema_id = self
825            .table_id_to_schema_id
826            .write()
827            .remove(&table_id)
828            .unwrap();
829        (self.get_database_id_by_schema(schema_id), schema_id)
830    }
831
832    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
833        let schema_id = self
834            .table_id_to_schema_id
835            .write()
836            .remove(&table_id)
837            .unwrap();
838        (self.get_database_id_by_schema(schema_id), schema_id)
839    }
840
841    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
842        let schema_id = self
843            .table_id_to_schema_id
844            .write()
845            .remove(&table_id)
846            .unwrap();
847        (self.get_database_id_by_schema(schema_id), schema_id)
848    }
849
850    fn add_schema_id(&self, schema_id: u32, database_id: DatabaseId) {
851        self.schema_id_to_database_id
852            .write()
853            .insert(schema_id, database_id);
854    }
855
856    fn drop_schema_id(&self, schema_id: u32) -> DatabaseId {
857        self.schema_id_to_database_id
858            .write()
859            .remove(&schema_id)
860            .unwrap()
861    }
862
863    fn create_source_inner(&self, mut source: PbSource) -> Result<u32> {
864        source.id = self.gen_id();
865        self.catalog.write().create_source(&source);
866        self.add_table_or_source_id(source.id, source.schema_id, source.database_id);
867        Ok(source.id)
868    }
869
870    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
871        sink.id = self.gen_id();
872        sink.stream_job_status = PbStreamJobStatus::Created as _;
873        self.catalog.write().create_sink(&sink);
874        self.add_table_or_sink_id(sink.id, sink.schema_id, sink.database_id);
875        Ok(())
876    }
877
878    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
879        subscription.id = self.gen_id();
880        self.catalog.write().create_subscription(&subscription);
881        self.add_table_or_subscription_id(
882            subscription.id,
883            subscription.schema_id,
884            subscription.database_id,
885        );
886        Ok(())
887    }
888
889    fn get_database_id_by_schema(&self, schema_id: u32) -> DatabaseId {
890        *self
891            .schema_id_to_database_id
892            .read()
893            .get(&schema_id)
894            .unwrap()
895    }
896}
897
898pub struct MockUserInfoWriter {
899    id: AtomicU32,
900    user_info: Arc<RwLock<UserInfoManager>>,
901}
902
903#[async_trait::async_trait]
904impl UserInfoWriter for MockUserInfoWriter {
905    async fn create_user(&self, user: UserInfo) -> Result<()> {
906        let mut user = user;
907        user.id = self.gen_id();
908        self.user_info.write().create_user(user);
909        Ok(())
910    }
911
912    async fn drop_user(&self, id: UserId) -> Result<()> {
913        self.user_info.write().drop_user(id);
914        Ok(())
915    }
916
917    async fn update_user(
918        &self,
919        update_user: UserInfo,
920        update_fields: Vec<UpdateField>,
921    ) -> Result<()> {
922        let mut lock = self.user_info.write();
923        let id = update_user.get_id();
924        let Some(old_name) = lock.get_user_name_by_id(id) else {
925            return Ok(());
926        };
927        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
928        update_fields.into_iter().for_each(|field| match field {
929            UpdateField::Super => user_info.is_super = update_user.is_super,
930            UpdateField::Login => user_info.can_login = update_user.can_login,
931            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
932            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
933            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
934            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
935            UpdateField::Unspecified => unreachable!(),
936        });
937        lock.update_user(update_user);
938        Ok(())
939    }
940
941    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
942    /// `GrantAllSources` when grant privilege to user.
943    async fn grant_privilege(
944        &self,
945        users: Vec<UserId>,
946        privileges: Vec<GrantPrivilege>,
947        with_grant_option: bool,
948        _grantor: UserId,
949    ) -> Result<()> {
950        let privileges = privileges
951            .into_iter()
952            .map(|mut p| {
953                p.action_with_opts
954                    .iter_mut()
955                    .for_each(|ao| ao.with_grant_option = with_grant_option);
956                p
957            })
958            .collect::<Vec<_>>();
959        for user_id in users {
960            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
961                u.extend_privileges(privileges.clone());
962            }
963        }
964        Ok(())
965    }
966
967    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
968    /// `RevokeAllSources` when revoke privilege from user.
969    async fn revoke_privilege(
970        &self,
971        users: Vec<UserId>,
972        privileges: Vec<GrantPrivilege>,
973        _granted_by: UserId,
974        _revoke_by: UserId,
975        revoke_grant_option: bool,
976        _cascade: bool,
977    ) -> Result<()> {
978        for user_id in users {
979            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
980                u.revoke_privileges(privileges.clone(), revoke_grant_option);
981            }
982        }
983        Ok(())
984    }
985
986    async fn alter_default_privilege(
987        &self,
988        _users: Vec<UserId>,
989        _database_id: DatabaseId,
990        _schemas: Vec<SchemaId>,
991        _operation: AlterDefaultPrivilegeOperation,
992        _operated_by: UserId,
993    ) -> Result<()> {
994        todo!()
995    }
996}
997
998impl MockUserInfoWriter {
999    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1000        user_info.write().create_user(UserInfo {
1001            id: DEFAULT_SUPER_USER_ID,
1002            name: DEFAULT_SUPER_USER.to_owned(),
1003            is_super: true,
1004            can_create_db: true,
1005            can_create_user: true,
1006            can_login: true,
1007            ..Default::default()
1008        });
1009        Self {
1010            user_info,
1011            id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
1012        }
1013    }
1014
1015    fn gen_id(&self) -> u32 {
1016        self.id.fetch_add(1, Ordering::SeqCst)
1017    }
1018}
1019
1020pub struct MockFrontendMetaClient {}
1021
1022#[async_trait::async_trait]
1023impl FrontendMetaClient for MockFrontendMetaClient {
1024    async fn try_unregister(&self) {}
1025
1026    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1027        Ok(INVALID_VERSION_ID)
1028    }
1029
1030    async fn wait(&self) -> RpcResult<()> {
1031        Ok(())
1032    }
1033
1034    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1035        Ok(vec![])
1036    }
1037
1038    async fn list_table_fragments(
1039        &self,
1040        _table_ids: &[u32],
1041    ) -> RpcResult<HashMap<u32, TableFragmentInfo>> {
1042        Ok(HashMap::default())
1043    }
1044
1045    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1046        Ok(vec![])
1047    }
1048
1049    async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1050        Ok(vec![])
1051    }
1052
1053    async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1054        Ok(vec![])
1055    }
1056
1057    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1058        Ok(vec![])
1059    }
1060
1061    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1062        Ok(vec![])
1063    }
1064
1065    async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1066        Ok(vec![])
1067    }
1068
1069    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1070        Ok(vec![])
1071    }
1072
1073    async fn set_system_param(
1074        &self,
1075        _param: String,
1076        _value: Option<String>,
1077    ) -> RpcResult<Option<SystemParamsReader>> {
1078        Ok(Some(SystemParams::default().into()))
1079    }
1080
1081    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1082        Ok(Default::default())
1083    }
1084
1085    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1086        Ok("".to_owned())
1087    }
1088
1089    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1090        Ok(vec![])
1091    }
1092
1093    async fn get_tables(
1094        &self,
1095        _table_ids: &[u32],
1096        _include_dropped_tables: bool,
1097    ) -> RpcResult<HashMap<u32, Table>> {
1098        Ok(HashMap::new())
1099    }
1100
1101    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(u32, u64)>> {
1102        unimplemented!()
1103    }
1104
1105    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1106        Ok(HummockVersion::default())
1107    }
1108
1109    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1110        unimplemented!()
1111    }
1112
1113    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1114        unimplemented!()
1115    }
1116
1117    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1118        unimplemented!()
1119    }
1120
1121    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1122        unimplemented!()
1123    }
1124
1125    async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1126        unimplemented!()
1127    }
1128
1129    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1130        unimplemented!()
1131    }
1132
1133    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1134        unimplemented!()
1135    }
1136
1137    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1138        unimplemented!()
1139    }
1140
1141    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1142        Ok(vec![])
1143    }
1144
1145    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1146        unimplemented!()
1147    }
1148
1149    async fn recover(&self) -> RpcResult<()> {
1150        unimplemented!()
1151    }
1152
1153    async fn apply_throttle(
1154        &self,
1155        _kind: PbThrottleTarget,
1156        _id: u32,
1157        _rate_limit: Option<u32>,
1158    ) -> RpcResult<()> {
1159        unimplemented!()
1160    }
1161
1162    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1163        Ok(RecoveryStatus::StatusRunning)
1164    }
1165
1166    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1167        Ok(vec![])
1168    }
1169
1170    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1171        Ok(vec![])
1172    }
1173
1174    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1175        unimplemented!()
1176    }
1177
1178    async fn alter_sink_props(
1179        &self,
1180        _sink_id: u32,
1181        _changed_props: BTreeMap<String, String>,
1182        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1183        _connector_conn_ref: Option<u32>,
1184    ) -> RpcResult<()> {
1185        unimplemented!()
1186    }
1187
1188    async fn alter_source_connector_props(
1189        &self,
1190        _source_id: u32,
1191        _changed_props: BTreeMap<String, String>,
1192        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1193        _connector_conn_ref: Option<u32>,
1194    ) -> RpcResult<()> {
1195        unimplemented!()
1196    }
1197
1198    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1199        unimplemented!()
1200    }
1201
1202    async fn get_fragment_by_id(
1203        &self,
1204        _fragment_id: u32,
1205    ) -> RpcResult<Option<FragmentDistribution>> {
1206        unimplemented!()
1207    }
1208
1209    fn worker_id(&self) -> u32 {
1210        0
1211    }
1212
1213    async fn set_sync_log_store_aligned(&self, _job_id: u32, _aligned: bool) -> RpcResult<()> {
1214        Ok(())
1215    }
1216
1217    async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1218        Ok(1)
1219    }
1220
1221    async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1222        Ok(RefreshResponse { status: None })
1223    }
1224}
1225
1226#[cfg(test)]
1227pub static PROTO_FILE_DATA: &str = r#"
1228    syntax = "proto3";
1229    package test;
1230    message TestRecord {
1231      int32 id = 1;
1232      Country country = 3;
1233      int64 zipcode = 4;
1234      float rate = 5;
1235    }
1236    message TestRecordAlterType {
1237        string id = 1;
1238        Country country = 3;
1239        int32 zipcode = 4;
1240        float rate = 5;
1241      }
1242    message TestRecordExt {
1243      int32 id = 1;
1244      Country country = 3;
1245      int64 zipcode = 4;
1246      float rate = 5;
1247      string name = 6;
1248    }
1249    message Country {
1250      string address = 1;
1251      City city = 2;
1252      string zipcode = 3;
1253    }
1254    message City {
1255      string address = 1;
1256      string zipcode = 2;
1257    }"#;
1258
1259/// Returns the file.
1260/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1261pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1262    let in_file = Builder::new()
1263        .prefix("temp")
1264        .suffix(".proto")
1265        .rand_bytes(8)
1266        .tempfile()
1267        .unwrap();
1268
1269    let out_file = Builder::new()
1270        .prefix("temp")
1271        .suffix(".pb")
1272        .rand_bytes(8)
1273        .tempfile()
1274        .unwrap();
1275
1276    let mut file = in_file.as_file();
1277    file.write_all(proto_data.as_ref())
1278        .expect("writing binary to test file");
1279    file.flush().expect("flush temp file failed");
1280    let include_path = in_file
1281        .path()
1282        .parent()
1283        .unwrap()
1284        .to_string_lossy()
1285        .into_owned();
1286    let out_path = out_file.path().to_string_lossy().into_owned();
1287    let in_path = in_file.path().to_string_lossy().into_owned();
1288    let mut compile = std::process::Command::new("protoc");
1289
1290    let out = compile
1291        .arg("--include_imports")
1292        .arg("-I")
1293        .arg(include_path)
1294        .arg(format!("--descriptor_set_out={}", out_path))
1295        .arg(in_path)
1296        .output()
1297        .expect("failed to compile proto");
1298    if !out.status.success() {
1299        panic!("compile proto failed \n output: {:?}", out);
1300    }
1301    out_file
1302}