risingwave_frontend/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29    DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30    FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31    RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::session_config::SessionConfig;
35use risingwave_common::system_param::reader::SystemParamsReader;
36use risingwave_common::util::cluster_limit::ClusterLimit;
37use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
38use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
39use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
40use risingwave_pb::backup_service::MetaSnapshotMetadata;
41use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
42use risingwave_pb::catalog::{
43    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44    PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::{
49    DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
50    alter_swap_rename_request, create_connection_request,
51};
52use risingwave_pb::hummock::write_limits::WriteLimit;
53use risingwave_pb::hummock::{
54    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
55};
56use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
57use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
58use risingwave_pb::meta::list_actor_states_response::ActorState;
59use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
60use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
61use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
62use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
63use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
64use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
65use risingwave_pb::meta::{
66    EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
67    RefreshRequest, RefreshResponse, SystemParams,
68};
69use risingwave_pb::secret::PbSecretRef;
70use risingwave_pb::stream_plan::StreamFragmentGraph;
71use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
72use risingwave_pb::user::update_user_request::UpdateField;
73use risingwave_pb::user::{GrantPrivilege, UserInfo};
74use risingwave_rpc_client::error::Result as RpcResult;
75use tempfile::{Builder, NamedTempFile};
76
77use crate::FrontendOpts;
78use crate::catalog::catalog_service::CatalogWriter;
79use crate::catalog::root_catalog::Catalog;
80use crate::catalog::{DatabaseId, SchemaId, SecretId, SinkId};
81use crate::error::{ErrorCode, Result};
82use crate::handler::RwPgResponse;
83use crate::meta_client::FrontendMetaClient;
84use crate::scheduler::HummockSnapshotManagerRef;
85use crate::session::{AuthContext, FrontendEnv, SessionImpl};
86use crate::user::UserId;
87use crate::user::user_manager::UserInfoManager;
88use crate::user::user_service::UserInfoWriter;
89
90/// An embedded frontend without starting meta and without starting frontend as a tcp server.
91pub struct LocalFrontend {
92    pub opts: FrontendOpts,
93    env: FrontendEnv,
94}
95
96impl SessionManager for LocalFrontend {
97    type Session = SessionImpl;
98
99    fn create_dummy_session(
100        &self,
101        _database_id: u32,
102        _user_name: u32,
103    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
104        unreachable!()
105    }
106
107    fn connect(
108        &self,
109        _database: &str,
110        _user_name: &str,
111        _peer_addr: AddressRef,
112    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
113        Ok(self.session_ref())
114    }
115
116    fn cancel_queries_in_session(&self, _session_id: SessionId) {
117        unreachable!()
118    }
119
120    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
121        unreachable!()
122    }
123
124    fn end_session(&self, _session: &Self::Session) {
125        unreachable!()
126    }
127}
128
129impl LocalFrontend {
130    #[expect(clippy::unused_async)]
131    pub async fn new(opts: FrontendOpts) -> Self {
132        let env = FrontendEnv::mock();
133        Self { opts, env }
134    }
135
136    pub async fn run_sql(
137        &self,
138        sql: impl Into<String>,
139    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
140        let sql: Arc<str> = Arc::from(sql.into());
141        self.session_ref().run_statement(sql, vec![]).await
142    }
143
144    pub async fn run_sql_with_session(
145        &self,
146        session_ref: Arc<SessionImpl>,
147        sql: impl Into<String>,
148    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
149        let sql: Arc<str> = Arc::from(sql.into());
150        session_ref.run_statement(sql, vec![]).await
151    }
152
153    pub async fn run_user_sql(
154        &self,
155        sql: impl Into<String>,
156        database: String,
157        user_name: String,
158        user_id: UserId,
159    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
160        let sql: Arc<str> = Arc::from(sql.into());
161        self.session_user_ref(database, user_name, user_id)
162            .run_statement(sql, vec![])
163            .await
164    }
165
166    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
167        let mut rsp = self.run_sql(sql).await.unwrap();
168        let mut res = vec![];
169        #[for_await]
170        for row_set in rsp.values_stream() {
171            for row in row_set.unwrap() {
172                res.push(format!("{:?}", row));
173            }
174        }
175        res
176    }
177
178    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
179        let mut rsp = self.run_sql(sql).await.unwrap();
180        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
181        let mut res = String::new();
182        #[for_await]
183        for row_set in rsp.values_stream() {
184            for row in row_set.unwrap() {
185                let row: Row = row;
186                let row = row.values()[0].as_ref().unwrap();
187                res += std::str::from_utf8(row).unwrap();
188                res += "\n";
189            }
190        }
191        res
192    }
193
194    /// Creates a new session
195    pub fn session_ref(&self) -> Arc<SessionImpl> {
196        self.session_user_ref(
197            DEFAULT_DATABASE_NAME.to_owned(),
198            DEFAULT_SUPER_USER.to_owned(),
199            DEFAULT_SUPER_USER_ID,
200        )
201    }
202
203    pub fn session_user_ref(
204        &self,
205        database: String,
206        user_name: String,
207        user_id: UserId,
208    ) -> Arc<SessionImpl> {
209        Arc::new(SessionImpl::new(
210            self.env.clone(),
211            AuthContext::new(database, user_name, user_id),
212            UserAuthenticator::None,
213            // Local Frontend use a non-sense id.
214            (0, 0),
215            Address::Tcp(SocketAddr::new(
216                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
217                6666,
218            ))
219            .into(),
220            Default::default(),
221        ))
222    }
223}
224
225pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
226    if rsp.stmt_type() != StatementType::EXPLAIN {
227        panic!("RESPONSE INVALID: {rsp:?}");
228    }
229    let mut res = String::new();
230    #[for_await]
231    for row_set in rsp.values_stream() {
232        for row in row_set.unwrap() {
233            let row: Row = row;
234            let row = row.values()[0].as_ref().unwrap();
235            res += std::str::from_utf8(row).unwrap();
236            res += "\n";
237        }
238    }
239    res
240}
241
242pub struct MockCatalogWriter {
243    catalog: Arc<RwLock<Catalog>>,
244    id: AtomicU32,
245    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
246    schema_id_to_database_id: RwLock<HashMap<u32, DatabaseId>>,
247    hummock_snapshot_manager: HummockSnapshotManagerRef,
248}
249
250#[async_trait::async_trait]
251impl CatalogWriter for MockCatalogWriter {
252    async fn create_database(
253        &self,
254        db_name: &str,
255        owner: UserId,
256        resource_group: &str,
257        barrier_interval_ms: Option<u32>,
258        checkpoint_frequency: Option<u64>,
259    ) -> Result<()> {
260        let database_id = self.gen_id();
261        self.catalog.write().create_database(&PbDatabase {
262            name: db_name.to_owned(),
263            id: database_id,
264            owner,
265            resource_group: resource_group.to_owned(),
266            barrier_interval_ms,
267            checkpoint_frequency,
268        });
269        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
270            .await?;
271        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
272            .await?;
273        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
274            .await?;
275        Ok(())
276    }
277
278    async fn create_schema(
279        &self,
280        db_id: DatabaseId,
281        schema_name: &str,
282        owner: UserId,
283    ) -> Result<()> {
284        let id = self.gen_id();
285        self.catalog.write().create_schema(&PbSchema {
286            id,
287            name: schema_name.to_owned(),
288            database_id: db_id,
289            owner,
290        });
291        self.add_schema_id(id, db_id);
292        Ok(())
293    }
294
295    async fn create_materialized_view(
296        &self,
297        mut table: PbTable,
298        _graph: StreamFragmentGraph,
299        _dependencies: HashSet<ObjectId>,
300        _specific_resource_group: Option<String>,
301        _if_not_exists: bool,
302    ) -> Result<()> {
303        table.id = self.gen_id();
304        table.stream_job_status = PbStreamJobStatus::Created as _;
305        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
306        self.catalog.write().create_table(&table);
307        self.add_table_or_source_id(table.id, table.schema_id, table.database_id);
308        self.hummock_snapshot_manager
309            .add_table_for_test(TableId::new(table.id));
310        Ok(())
311    }
312
313    async fn replace_materialized_view(
314        &self,
315        mut table: PbTable,
316        _graph: StreamFragmentGraph,
317    ) -> Result<()> {
318        table.stream_job_status = PbStreamJobStatus::Created as _;
319        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
320        self.catalog.write().update_table(&table);
321        Ok(())
322    }
323
324    async fn create_view(&self, mut view: PbView, _dependencies: HashSet<ObjectId>) -> Result<()> {
325        view.id = self.gen_id();
326        self.catalog.write().create_view(&view);
327        self.add_table_or_source_id(view.id, view.schema_id, view.database_id);
328        Ok(())
329    }
330
331    async fn create_table(
332        &self,
333        source: Option<PbSource>,
334        mut table: PbTable,
335        graph: StreamFragmentGraph,
336        _job_type: PbTableJobType,
337        if_not_exists: bool,
338        _dependencies: HashSet<ObjectId>,
339    ) -> Result<()> {
340        if let Some(source) = source {
341            let source_id = self.create_source_inner(source)?;
342            table.optional_associated_source_id =
343                Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
344        }
345        self.create_materialized_view(table, graph, HashSet::new(), None, if_not_exists)
346            .await?;
347        Ok(())
348    }
349
350    async fn replace_table(
351        &self,
352        _source: Option<PbSource>,
353        mut table: PbTable,
354        _graph: StreamFragmentGraph,
355        _job_type: TableJobType,
356    ) -> Result<()> {
357        table.stream_job_status = PbStreamJobStatus::Created as _;
358        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
359        self.catalog.write().update_table(&table);
360        Ok(())
361    }
362
363    async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
364        self.catalog.write().update_source(&source);
365        Ok(())
366    }
367
368    async fn create_source(
369        &self,
370        source: PbSource,
371        _graph: Option<StreamFragmentGraph>,
372        _if_not_exists: bool,
373    ) -> Result<()> {
374        self.create_source_inner(source).map(|_| ())
375    }
376
377    async fn create_sink(
378        &self,
379        sink: PbSink,
380        graph: StreamFragmentGraph,
381        _dependencies: HashSet<ObjectId>,
382        _if_not_exists: bool,
383    ) -> Result<()> {
384        self.create_sink_inner(sink, graph)
385    }
386
387    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
388        self.create_subscription_inner(subscription)
389    }
390
391    async fn create_index(
392        &self,
393        mut index: PbIndex,
394        mut index_table: PbTable,
395        _graph: StreamFragmentGraph,
396        _if_not_exists: bool,
397    ) -> Result<()> {
398        index_table.id = self.gen_id();
399        index_table.stream_job_status = PbStreamJobStatus::Created as _;
400        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
401        self.catalog.write().create_table(&index_table);
402        self.add_table_or_index_id(
403            index_table.id,
404            index_table.schema_id,
405            index_table.database_id,
406        );
407
408        index.id = index_table.id;
409        index.index_table_id = index_table.id;
410        self.catalog.write().create_index(&index);
411        Ok(())
412    }
413
414    async fn create_function(&self, _function: PbFunction) -> Result<()> {
415        unreachable!()
416    }
417
418    async fn create_connection(
419        &self,
420        _connection_name: String,
421        _database_id: u32,
422        _schema_id: u32,
423        _owner_id: u32,
424        _connection: create_connection_request::Payload,
425    ) -> Result<()> {
426        unreachable!()
427    }
428
429    async fn create_secret(
430        &self,
431        _secret_name: String,
432        _database_id: u32,
433        _schema_id: u32,
434        _owner_id: u32,
435        _payload: Vec<u8>,
436    ) -> Result<()> {
437        unreachable!()
438    }
439
440    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
441        unreachable!()
442    }
443
444    async fn drop_table(
445        &self,
446        source_id: Option<u32>,
447        table_id: TableId,
448        cascade: bool,
449    ) -> Result<()> {
450        if cascade {
451            return Err(ErrorCode::NotSupported(
452                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
453                "use drop instead".to_owned(),
454            )
455            .into());
456        }
457        if let Some(source_id) = source_id {
458            self.drop_table_or_source_id(source_id);
459        }
460        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
461        let indexes =
462            self.catalog
463                .read()
464                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
465        for index in indexes {
466            self.drop_index(index.id, cascade).await?;
467        }
468        self.catalog
469            .write()
470            .drop_table(database_id, schema_id, table_id);
471        if let Some(source_id) = source_id {
472            self.catalog
473                .write()
474                .drop_source(database_id, schema_id, source_id);
475        }
476        Ok(())
477    }
478
479    async fn drop_view(&self, _view_id: u32, _cascade: bool) -> Result<()> {
480        unreachable!()
481    }
482
483    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
484        if cascade {
485            return Err(ErrorCode::NotSupported(
486                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
487                "use drop instead".to_owned(),
488            )
489            .into());
490        }
491        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
492        let indexes =
493            self.catalog
494                .read()
495                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
496        for index in indexes {
497            self.drop_index(index.id, cascade).await?;
498        }
499        self.catalog
500            .write()
501            .drop_table(database_id, schema_id, table_id);
502        Ok(())
503    }
504
505    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
506        if cascade {
507            return Err(ErrorCode::NotSupported(
508                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
509                "use drop instead".to_owned(),
510            )
511            .into());
512        }
513        let (database_id, schema_id) = self.drop_table_or_source_id(source_id);
514        self.catalog
515            .write()
516            .drop_source(database_id, schema_id, source_id);
517        Ok(())
518    }
519
520    async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<()> {
521        if cascade {
522            return Err(ErrorCode::NotSupported(
523                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
524                "use drop instead".to_owned(),
525            )
526            .into());
527        }
528        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id);
529        self.catalog
530            .write()
531            .drop_sink(database_id, schema_id, sink_id);
532        Ok(())
533    }
534
535    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
536        if cascade {
537            return Err(ErrorCode::NotSupported(
538                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
539                "use drop instead".to_owned(),
540            )
541            .into());
542        }
543        let (database_id, schema_id) = self.drop_table_or_subscription_id(subscription_id);
544        self.catalog
545            .write()
546            .drop_subscription(database_id, schema_id, subscription_id);
547        Ok(())
548    }
549
550    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
551        if cascade {
552            return Err(ErrorCode::NotSupported(
553                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
554                "use drop instead".to_owned(),
555            )
556            .into());
557        }
558        let &schema_id = self
559            .table_id_to_schema_id
560            .read()
561            .get(&index_id.index_id)
562            .unwrap();
563        let database_id = self.get_database_id_by_schema(schema_id);
564
565        let index = {
566            let catalog_reader = self.catalog.read();
567            let schema_catalog = catalog_reader
568                .get_schema_by_id(&database_id, &schema_id)
569                .unwrap();
570            schema_catalog.get_index_by_id(&index_id).unwrap().clone()
571        };
572
573        let index_table_id = index.index_table().id;
574        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.index_id);
575        self.catalog
576            .write()
577            .drop_index(database_id, schema_id, index_id);
578        self.catalog
579            .write()
580            .drop_table(database_id, schema_id, index_table_id);
581        Ok(())
582    }
583
584    async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
585        unreachable!()
586    }
587
588    async fn drop_connection(&self, _connection_id: u32, _cascade: bool) -> Result<()> {
589        unreachable!()
590    }
591
592    async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
593        unreachable!()
594    }
595
596    async fn drop_database(&self, database_id: u32) -> Result<()> {
597        self.catalog.write().drop_database(database_id);
598        Ok(())
599    }
600
601    async fn drop_schema(&self, schema_id: u32, _cascade: bool) -> Result<()> {
602        let database_id = self.drop_schema_id(schema_id);
603        self.catalog.write().drop_schema(database_id, schema_id);
604        Ok(())
605    }
606
607    async fn alter_name(
608        &self,
609        object_id: alter_name_request::Object,
610        object_name: &str,
611    ) -> Result<()> {
612        match object_id {
613            alter_name_request::Object::TableId(table_id) => {
614                self.catalog
615                    .write()
616                    .alter_table_name_by_id(&table_id.into(), object_name);
617                Ok(())
618            }
619            _ => {
620                unimplemented!()
621            }
622        }
623    }
624
625    async fn alter_source(&self, source: PbSource) -> Result<()> {
626        self.catalog.write().update_source(&source);
627        Ok(())
628    }
629
630    async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
631        for database in self.catalog.read().iter_databases() {
632            for schema in database.iter_schemas() {
633                match object {
634                    Object::TableId(table_id) => {
635                        if let Some(table) =
636                            schema.get_created_table_by_id(&TableId::from(table_id))
637                        {
638                            let mut pb_table = table.to_prost();
639                            pb_table.owner = owner_id;
640                            self.catalog.write().update_table(&pb_table);
641                            return Ok(());
642                        }
643                    }
644                    _ => unreachable!(),
645                }
646            }
647        }
648
649        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
650    }
651
652    async fn alter_set_schema(
653        &self,
654        object: alter_set_schema_request::Object,
655        new_schema_id: u32,
656    ) -> Result<()> {
657        match object {
658            alter_set_schema_request::Object::TableId(table_id) => {
659                let mut pb_table = {
660                    let reader = self.catalog.read();
661                    let table = reader.get_any_table_by_id(&table_id.into())?.to_owned();
662                    table.to_prost()
663                };
664                pb_table.schema_id = new_schema_id;
665                self.catalog.write().update_table(&pb_table);
666                self.table_id_to_schema_id
667                    .write()
668                    .insert(table_id, new_schema_id);
669                Ok(())
670            }
671            _ => unreachable!(),
672        }
673    }
674
675    async fn alter_parallelism(
676        &self,
677        _table_id: u32,
678        _parallelism: PbTableParallelism,
679        _deferred: bool,
680    ) -> Result<()> {
681        todo!()
682    }
683
684    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
685        todo!()
686    }
687
688    async fn alter_secret(
689        &self,
690        _secret_id: u32,
691        _secret_name: String,
692        _database_id: u32,
693        _schema_id: u32,
694        _owner_id: u32,
695        _payload: Vec<u8>,
696    ) -> Result<()> {
697        unreachable!()
698    }
699
700    async fn alter_resource_group(
701        &self,
702        _table_id: u32,
703        _resource_group: Option<String>,
704        _deferred: bool,
705    ) -> Result<()> {
706        todo!()
707    }
708
709    async fn alter_database_param(
710        &self,
711        database_id: u32,
712        param: AlterDatabaseParam,
713    ) -> Result<()> {
714        let mut pb_database = {
715            let reader = self.catalog.read();
716            let database = reader.get_database_by_id(&database_id)?.to_owned();
717            database.to_prost()
718        };
719        match param {
720            AlterDatabaseParam::BarrierIntervalMs(interval) => {
721                pb_database.barrier_interval_ms = interval;
722            }
723            AlterDatabaseParam::CheckpointFrequency(frequency) => {
724                pb_database.checkpoint_frequency = frequency;
725            }
726        }
727        self.catalog.write().update_database(&pb_database);
728        Ok(())
729    }
730}
731
732impl MockCatalogWriter {
733    pub fn new(
734        catalog: Arc<RwLock<Catalog>>,
735        hummock_snapshot_manager: HummockSnapshotManagerRef,
736    ) -> Self {
737        catalog.write().create_database(&PbDatabase {
738            id: 0,
739            name: DEFAULT_DATABASE_NAME.to_owned(),
740            owner: DEFAULT_SUPER_USER_ID,
741            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
742            barrier_interval_ms: None,
743            checkpoint_frequency: None,
744        });
745        catalog.write().create_schema(&PbSchema {
746            id: 1,
747            name: DEFAULT_SCHEMA_NAME.to_owned(),
748            database_id: 0,
749            owner: DEFAULT_SUPER_USER_ID,
750        });
751        catalog.write().create_schema(&PbSchema {
752            id: 2,
753            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
754            database_id: 0,
755            owner: DEFAULT_SUPER_USER_ID,
756        });
757        catalog.write().create_schema(&PbSchema {
758            id: 3,
759            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
760            database_id: 0,
761            owner: DEFAULT_SUPER_USER_ID,
762        });
763        let mut map: HashMap<u32, DatabaseId> = HashMap::new();
764        map.insert(1_u32, 0_u32);
765        map.insert(2_u32, 0_u32);
766        map.insert(3_u32, 0_u32);
767        Self {
768            catalog,
769            id: AtomicU32::new(3),
770            table_id_to_schema_id: Default::default(),
771            schema_id_to_database_id: RwLock::new(map),
772            hummock_snapshot_manager,
773        }
774    }
775
776    fn gen_id(&self) -> u32 {
777        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
778        self.id.fetch_add(1, Ordering::SeqCst) + 1
779    }
780
781    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
782        self.table_id_to_schema_id
783            .write()
784            .insert(table_id, schema_id);
785    }
786
787    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
788        let schema_id = self
789            .table_id_to_schema_id
790            .write()
791            .remove(&table_id)
792            .unwrap();
793        (self.get_database_id_by_schema(schema_id), schema_id)
794    }
795
796    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
797        self.table_id_to_schema_id
798            .write()
799            .insert(table_id, schema_id);
800    }
801
802    fn add_table_or_subscription_id(
803        &self,
804        table_id: u32,
805        schema_id: SchemaId,
806        _database_id: DatabaseId,
807    ) {
808        self.table_id_to_schema_id
809            .write()
810            .insert(table_id, schema_id);
811    }
812
813    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
814        self.table_id_to_schema_id
815            .write()
816            .insert(table_id, schema_id);
817    }
818
819    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
820        let schema_id = self
821            .table_id_to_schema_id
822            .write()
823            .remove(&table_id)
824            .unwrap();
825        (self.get_database_id_by_schema(schema_id), schema_id)
826    }
827
828    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
829        let schema_id = self
830            .table_id_to_schema_id
831            .write()
832            .remove(&table_id)
833            .unwrap();
834        (self.get_database_id_by_schema(schema_id), schema_id)
835    }
836
837    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
838        let schema_id = self
839            .table_id_to_schema_id
840            .write()
841            .remove(&table_id)
842            .unwrap();
843        (self.get_database_id_by_schema(schema_id), schema_id)
844    }
845
846    fn add_schema_id(&self, schema_id: u32, database_id: DatabaseId) {
847        self.schema_id_to_database_id
848            .write()
849            .insert(schema_id, database_id);
850    }
851
852    fn drop_schema_id(&self, schema_id: u32) -> DatabaseId {
853        self.schema_id_to_database_id
854            .write()
855            .remove(&schema_id)
856            .unwrap()
857    }
858
859    fn create_source_inner(&self, mut source: PbSource) -> Result<u32> {
860        source.id = self.gen_id();
861        self.catalog.write().create_source(&source);
862        self.add_table_or_source_id(source.id, source.schema_id, source.database_id);
863        Ok(source.id)
864    }
865
866    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
867        sink.id = self.gen_id();
868        sink.stream_job_status = PbStreamJobStatus::Created as _;
869        self.catalog.write().create_sink(&sink);
870        self.add_table_or_sink_id(sink.id, sink.schema_id, sink.database_id);
871        Ok(())
872    }
873
874    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
875        subscription.id = self.gen_id();
876        self.catalog.write().create_subscription(&subscription);
877        self.add_table_or_subscription_id(
878            subscription.id,
879            subscription.schema_id,
880            subscription.database_id,
881        );
882        Ok(())
883    }
884
885    fn get_database_id_by_schema(&self, schema_id: u32) -> DatabaseId {
886        *self
887            .schema_id_to_database_id
888            .read()
889            .get(&schema_id)
890            .unwrap()
891    }
892}
893
894pub struct MockUserInfoWriter {
895    id: AtomicU32,
896    user_info: Arc<RwLock<UserInfoManager>>,
897}
898
899#[async_trait::async_trait]
900impl UserInfoWriter for MockUserInfoWriter {
901    async fn create_user(&self, user: UserInfo) -> Result<()> {
902        let mut user = user;
903        user.id = self.gen_id();
904        self.user_info.write().create_user(user);
905        Ok(())
906    }
907
908    async fn drop_user(&self, id: UserId) -> Result<()> {
909        self.user_info.write().drop_user(id);
910        Ok(())
911    }
912
913    async fn update_user(
914        &self,
915        update_user: UserInfo,
916        update_fields: Vec<UpdateField>,
917    ) -> Result<()> {
918        let mut lock = self.user_info.write();
919        let id = update_user.get_id();
920        let Some(old_name) = lock.get_user_name_by_id(id) else {
921            return Ok(());
922        };
923        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
924        update_fields.into_iter().for_each(|field| match field {
925            UpdateField::Super => user_info.is_super = update_user.is_super,
926            UpdateField::Login => user_info.can_login = update_user.can_login,
927            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
928            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
929            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
930            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
931            UpdateField::Admin => user_info.is_admin = update_user.is_admin,
932            UpdateField::Unspecified => unreachable!(),
933        });
934        lock.update_user(update_user);
935        Ok(())
936    }
937
938    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
939    /// `GrantAllSources` when grant privilege to user.
940    async fn grant_privilege(
941        &self,
942        users: Vec<UserId>,
943        privileges: Vec<GrantPrivilege>,
944        with_grant_option: bool,
945        _grantor: UserId,
946    ) -> Result<()> {
947        let privileges = privileges
948            .into_iter()
949            .map(|mut p| {
950                p.action_with_opts
951                    .iter_mut()
952                    .for_each(|ao| ao.with_grant_option = with_grant_option);
953                p
954            })
955            .collect::<Vec<_>>();
956        for user_id in users {
957            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
958                u.extend_privileges(privileges.clone());
959            }
960        }
961        Ok(())
962    }
963
964    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
965    /// `RevokeAllSources` when revoke privilege from user.
966    async fn revoke_privilege(
967        &self,
968        users: Vec<UserId>,
969        privileges: Vec<GrantPrivilege>,
970        _granted_by: UserId,
971        _revoke_by: UserId,
972        revoke_grant_option: bool,
973        _cascade: bool,
974    ) -> Result<()> {
975        for user_id in users {
976            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
977                u.revoke_privileges(privileges.clone(), revoke_grant_option);
978            }
979        }
980        Ok(())
981    }
982
983    async fn alter_default_privilege(
984        &self,
985        _users: Vec<UserId>,
986        _database_id: DatabaseId,
987        _schemas: Vec<SchemaId>,
988        _operation: AlterDefaultPrivilegeOperation,
989        _operated_by: UserId,
990    ) -> Result<()> {
991        todo!()
992    }
993}
994
995impl MockUserInfoWriter {
996    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
997        user_info.write().create_user(UserInfo {
998            id: DEFAULT_SUPER_USER_ID,
999            name: DEFAULT_SUPER_USER.to_owned(),
1000            is_super: true,
1001            can_create_db: true,
1002            can_create_user: true,
1003            can_login: true,
1004            ..Default::default()
1005        });
1006        user_info.write().create_user(UserInfo {
1007            id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1008            name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1009            is_super: true,
1010            can_create_db: true,
1011            can_create_user: true,
1012            can_login: true,
1013            is_admin: true,
1014            ..Default::default()
1015        });
1016        Self {
1017            user_info,
1018            id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
1019        }
1020    }
1021
1022    fn gen_id(&self) -> u32 {
1023        self.id.fetch_add(1, Ordering::SeqCst)
1024    }
1025}
1026
1027pub struct MockFrontendMetaClient {}
1028
1029#[async_trait::async_trait]
1030impl FrontendMetaClient for MockFrontendMetaClient {
1031    async fn try_unregister(&self) {}
1032
1033    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1034        Ok(INVALID_VERSION_ID)
1035    }
1036
1037    async fn wait(&self) -> RpcResult<()> {
1038        Ok(())
1039    }
1040
1041    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1042        Ok(vec![])
1043    }
1044
1045    async fn list_table_fragments(
1046        &self,
1047        _table_ids: &[u32],
1048    ) -> RpcResult<HashMap<u32, TableFragmentInfo>> {
1049        Ok(HashMap::default())
1050    }
1051
1052    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1053        Ok(vec![])
1054    }
1055
1056    async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1057        Ok(vec![])
1058    }
1059
1060    async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1061        Ok(vec![])
1062    }
1063
1064    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1065        Ok(vec![])
1066    }
1067
1068    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1069        Ok(vec![])
1070    }
1071
1072    async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1073        Ok(vec![])
1074    }
1075
1076    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1077        Ok(vec![])
1078    }
1079
1080    async fn set_system_param(
1081        &self,
1082        _param: String,
1083        _value: Option<String>,
1084    ) -> RpcResult<Option<SystemParamsReader>> {
1085        Ok(Some(SystemParams::default().into()))
1086    }
1087
1088    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1089        Ok(Default::default())
1090    }
1091
1092    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1093        Ok("".to_owned())
1094    }
1095
1096    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1097        Ok(vec![])
1098    }
1099
1100    async fn get_tables(
1101        &self,
1102        _table_ids: &[u32],
1103        _include_dropped_tables: bool,
1104    ) -> RpcResult<HashMap<u32, Table>> {
1105        Ok(HashMap::new())
1106    }
1107
1108    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(u32, u64)>> {
1109        unimplemented!()
1110    }
1111
1112    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1113        Ok(HummockVersion::default())
1114    }
1115
1116    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1117        unimplemented!()
1118    }
1119
1120    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1121        unimplemented!()
1122    }
1123
1124    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1125        unimplemented!()
1126    }
1127
1128    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1129        unimplemented!()
1130    }
1131
1132    async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1133        unimplemented!()
1134    }
1135
1136    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1137        unimplemented!()
1138    }
1139
1140    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1141        unimplemented!()
1142    }
1143
1144    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1145        unimplemented!()
1146    }
1147
1148    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1149        Ok(vec![])
1150    }
1151
1152    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1153        unimplemented!()
1154    }
1155
1156    async fn recover(&self) -> RpcResult<()> {
1157        unimplemented!()
1158    }
1159
1160    async fn apply_throttle(
1161        &self,
1162        _kind: PbThrottleTarget,
1163        _id: u32,
1164        _rate_limit: Option<u32>,
1165    ) -> RpcResult<()> {
1166        unimplemented!()
1167    }
1168
1169    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1170        Ok(RecoveryStatus::StatusRunning)
1171    }
1172
1173    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1174        Ok(vec![])
1175    }
1176
1177    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1178        Ok(vec![])
1179    }
1180
1181    async fn list_cdc_progress(&self) -> RpcResult<HashMap<u32, PbCdcProgress>> {
1182        Ok(HashMap::default())
1183    }
1184
1185    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1186        unimplemented!()
1187    }
1188
1189    async fn alter_sink_props(
1190        &self,
1191        _sink_id: u32,
1192        _changed_props: BTreeMap<String, String>,
1193        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1194        _connector_conn_ref: Option<u32>,
1195    ) -> RpcResult<()> {
1196        unimplemented!()
1197    }
1198
1199    async fn alter_source_connector_props(
1200        &self,
1201        _source_id: u32,
1202        _changed_props: BTreeMap<String, String>,
1203        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1204        _connector_conn_ref: Option<u32>,
1205    ) -> RpcResult<()> {
1206        unimplemented!()
1207    }
1208
1209    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1210        unimplemented!()
1211    }
1212
1213    async fn get_fragment_by_id(
1214        &self,
1215        _fragment_id: u32,
1216    ) -> RpcResult<Option<FragmentDistribution>> {
1217        unimplemented!()
1218    }
1219
1220    fn worker_id(&self) -> u32 {
1221        0
1222    }
1223
1224    async fn set_sync_log_store_aligned(&self, _job_id: u32, _aligned: bool) -> RpcResult<()> {
1225        Ok(())
1226    }
1227
1228    async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1229        Ok(1)
1230    }
1231
1232    async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1233        Ok(())
1234    }
1235
1236    async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1237        Ok(RefreshResponse { status: None })
1238    }
1239}
1240
1241#[cfg(test)]
1242pub static PROTO_FILE_DATA: &str = r#"
1243    syntax = "proto3";
1244    package test;
1245    message TestRecord {
1246      int32 id = 1;
1247      Country country = 3;
1248      int64 zipcode = 4;
1249      float rate = 5;
1250    }
1251    message TestRecordAlterType {
1252        string id = 1;
1253        Country country = 3;
1254        int32 zipcode = 4;
1255        float rate = 5;
1256      }
1257    message TestRecordExt {
1258      int32 id = 1;
1259      Country country = 3;
1260      int64 zipcode = 4;
1261      float rate = 5;
1262      string name = 6;
1263    }
1264    message Country {
1265      string address = 1;
1266      City city = 2;
1267      string zipcode = 3;
1268    }
1269    message City {
1270      string address = 1;
1271      string zipcode = 2;
1272    }"#;
1273
1274/// Returns the file.
1275/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1276pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1277    let in_file = Builder::new()
1278        .prefix("temp")
1279        .suffix(".proto")
1280        .rand_bytes(8)
1281        .tempfile()
1282        .unwrap();
1283
1284    let out_file = Builder::new()
1285        .prefix("temp")
1286        .suffix(".pb")
1287        .rand_bytes(8)
1288        .tempfile()
1289        .unwrap();
1290
1291    let mut file = in_file.as_file();
1292    file.write_all(proto_data.as_ref())
1293        .expect("writing binary to test file");
1294    file.flush().expect("flush temp file failed");
1295    let include_path = in_file
1296        .path()
1297        .parent()
1298        .unwrap()
1299        .to_string_lossy()
1300        .into_owned();
1301    let out_path = out_file.path().to_string_lossy().into_owned();
1302    let in_path = in_file.path().to_string_lossy().into_owned();
1303    let mut compile = std::process::Command::new("protoc");
1304
1305    let out = compile
1306        .arg("--include_imports")
1307        .arg("-I")
1308        .arg(include_path)
1309        .arg(format!("--descriptor_set_out={}", out_path))
1310        .arg(in_path)
1311        .output()
1312        .expect("failed to compile proto");
1313    if !out.status.success() {
1314        panic!("compile proto failed \n output: {:?}", out);
1315    }
1316    out_file
1317}