risingwave_frontend/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29    DEFAULT_SUPER_USER_ID, FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId,
30    PG_CATALOG_SCHEMA_NAME, RW_CATALOG_SCHEMA_NAME, TableId,
31};
32use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
33use risingwave_common::session_config::SessionConfig;
34use risingwave_common::system_param::reader::SystemParamsReader;
35use risingwave_common::util::cluster_limit::ClusterLimit;
36use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
37use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
38use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
39use risingwave_pb::backup_service::MetaSnapshotMetadata;
40use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
41use risingwave_pb::catalog::{
42    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
43    PbSubscription, PbTable, PbView, Table,
44};
45use risingwave_pb::common::WorkerNode;
46use risingwave_pb::ddl_service::alter_owner_request::Object;
47use risingwave_pb::ddl_service::{
48    DdlProgress, PbTableJobType, ReplaceJobPlan, TableJobType, alter_name_request,
49    alter_set_schema_request, alter_swap_rename_request, create_connection_request,
50};
51use risingwave_pb::hummock::write_limits::WriteLimit;
52use risingwave_pb::hummock::{
53    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
54};
55use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
56use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
57use risingwave_pb::meta::list_actor_states_response::ActorState;
58use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
59use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
60use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
61use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
62use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
63use risingwave_pb::meta::{
64    EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
65    SystemParams,
66};
67use risingwave_pb::secret::PbSecretRef;
68use risingwave_pb::stream_plan::StreamFragmentGraph;
69use risingwave_pb::user::update_user_request::UpdateField;
70use risingwave_pb::user::{GrantPrivilege, UserInfo};
71use risingwave_rpc_client::error::Result as RpcResult;
72use tempfile::{Builder, NamedTempFile};
73
74use crate::FrontendOpts;
75use crate::catalog::catalog_service::CatalogWriter;
76use crate::catalog::root_catalog::Catalog;
77use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId};
78use crate::error::{ErrorCode, Result};
79use crate::handler::RwPgResponse;
80use crate::meta_client::FrontendMetaClient;
81use crate::scheduler::HummockSnapshotManagerRef;
82use crate::session::{AuthContext, FrontendEnv, SessionImpl};
83use crate::user::UserId;
84use crate::user::user_manager::UserInfoManager;
85use crate::user::user_service::UserInfoWriter;
86
87/// An embedded frontend without starting meta and without starting frontend as a tcp server.
88pub struct LocalFrontend {
89    pub opts: FrontendOpts,
90    env: FrontendEnv,
91}
92
93impl SessionManager for LocalFrontend {
94    type Session = SessionImpl;
95
96    fn create_dummy_session(
97        &self,
98        _database_id: u32,
99        _user_name: u32,
100    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
101        unreachable!()
102    }
103
104    fn connect(
105        &self,
106        _database: &str,
107        _user_name: &str,
108        _peer_addr: AddressRef,
109    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
110        Ok(self.session_ref())
111    }
112
113    fn cancel_queries_in_session(&self, _session_id: SessionId) {
114        unreachable!()
115    }
116
117    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
118        unreachable!()
119    }
120
121    fn end_session(&self, _session: &Self::Session) {
122        unreachable!()
123    }
124}
125
126impl LocalFrontend {
127    #[expect(clippy::unused_async)]
128    pub async fn new(opts: FrontendOpts) -> Self {
129        let env = FrontendEnv::mock();
130        Self { opts, env }
131    }
132
133    pub async fn run_sql(
134        &self,
135        sql: impl Into<String>,
136    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
137        let sql: Arc<str> = Arc::from(sql.into());
138        self.session_ref().run_statement(sql, vec![]).await
139    }
140
141    pub async fn run_sql_with_session(
142        &self,
143        session_ref: Arc<SessionImpl>,
144        sql: impl Into<String>,
145    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
146        let sql: Arc<str> = Arc::from(sql.into());
147        session_ref.run_statement(sql, vec![]).await
148    }
149
150    pub async fn run_user_sql(
151        &self,
152        sql: impl Into<String>,
153        database: String,
154        user_name: String,
155        user_id: UserId,
156    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
157        let sql: Arc<str> = Arc::from(sql.into());
158        self.session_user_ref(database, user_name, user_id)
159            .run_statement(sql, vec![])
160            .await
161    }
162
163    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
164        let mut rsp = self.run_sql(sql).await.unwrap();
165        let mut res = vec![];
166        #[for_await]
167        for row_set in rsp.values_stream() {
168            for row in row_set.unwrap() {
169                res.push(format!("{:?}", row));
170            }
171        }
172        res
173    }
174
175    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
176        let mut rsp = self.run_sql(sql).await.unwrap();
177        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
178        let mut res = String::new();
179        #[for_await]
180        for row_set in rsp.values_stream() {
181            for row in row_set.unwrap() {
182                let row: Row = row;
183                let row = row.values()[0].as_ref().unwrap();
184                res += std::str::from_utf8(row).unwrap();
185                res += "\n";
186            }
187        }
188        res
189    }
190
191    /// Creates a new session
192    pub fn session_ref(&self) -> Arc<SessionImpl> {
193        self.session_user_ref(
194            DEFAULT_DATABASE_NAME.to_owned(),
195            DEFAULT_SUPER_USER.to_owned(),
196            DEFAULT_SUPER_USER_ID,
197        )
198    }
199
200    pub fn session_user_ref(
201        &self,
202        database: String,
203        user_name: String,
204        user_id: UserId,
205    ) -> Arc<SessionImpl> {
206        Arc::new(SessionImpl::new(
207            self.env.clone(),
208            AuthContext::new(database, user_name, user_id),
209            UserAuthenticator::None,
210            // Local Frontend use a non-sense id.
211            (0, 0),
212            Address::Tcp(SocketAddr::new(
213                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
214                6666,
215            ))
216            .into(),
217            Default::default(),
218        ))
219    }
220}
221
222pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
223    if rsp.stmt_type() != StatementType::EXPLAIN {
224        panic!("RESPONSE INVALID: {rsp:?}");
225    }
226    let mut res = String::new();
227    #[for_await]
228    for row_set in rsp.values_stream() {
229        for row in row_set.unwrap() {
230            let row: Row = row;
231            let row = row.values()[0].as_ref().unwrap();
232            res += std::str::from_utf8(row).unwrap();
233            res += "\n";
234        }
235    }
236    res
237}
238
239pub struct MockCatalogWriter {
240    catalog: Arc<RwLock<Catalog>>,
241    id: AtomicU32,
242    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
243    schema_id_to_database_id: RwLock<HashMap<u32, DatabaseId>>,
244    hummock_snapshot_manager: HummockSnapshotManagerRef,
245}
246
247#[async_trait::async_trait]
248impl CatalogWriter for MockCatalogWriter {
249    async fn create_database(
250        &self,
251        db_name: &str,
252        owner: UserId,
253        resource_group: &str,
254        barrier_interval_ms: Option<u32>,
255        checkpoint_frequency: Option<u64>,
256    ) -> Result<()> {
257        let database_id = self.gen_id();
258        self.catalog.write().create_database(&PbDatabase {
259            name: db_name.to_owned(),
260            id: database_id,
261            owner,
262            resource_group: resource_group.to_owned(),
263            barrier_interval_ms,
264            checkpoint_frequency,
265        });
266        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
267            .await?;
268        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
269            .await?;
270        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
271            .await?;
272        Ok(())
273    }
274
275    async fn create_schema(
276        &self,
277        db_id: DatabaseId,
278        schema_name: &str,
279        owner: UserId,
280    ) -> Result<()> {
281        let id = self.gen_id();
282        self.catalog.write().create_schema(&PbSchema {
283            id,
284            name: schema_name.to_owned(),
285            database_id: db_id,
286            owner,
287        });
288        self.add_schema_id(id, db_id);
289        Ok(())
290    }
291
292    async fn create_materialized_view(
293        &self,
294        mut table: PbTable,
295        _graph: StreamFragmentGraph,
296        _dependencies: HashSet<ObjectId>,
297        _specific_resource_group: Option<String>,
298        _if_not_exists: bool,
299    ) -> Result<()> {
300        table.id = self.gen_id();
301        table.stream_job_status = PbStreamJobStatus::Created as _;
302        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
303        self.catalog.write().create_table(&table);
304        self.add_table_or_source_id(table.id, table.schema_id, table.database_id);
305        self.hummock_snapshot_manager
306            .add_table_for_test(TableId::new(table.id));
307        Ok(())
308    }
309
310    async fn create_view(&self, mut view: PbView) -> Result<()> {
311        view.id = self.gen_id();
312        self.catalog.write().create_view(&view);
313        self.add_table_or_source_id(view.id, view.schema_id, view.database_id);
314        Ok(())
315    }
316
317    async fn create_table(
318        &self,
319        source: Option<PbSource>,
320        mut table: PbTable,
321        graph: StreamFragmentGraph,
322        _job_type: PbTableJobType,
323        if_not_exists: bool,
324    ) -> Result<()> {
325        if let Some(source) = source {
326            let source_id = self.create_source_inner(source)?;
327            table.optional_associated_source_id =
328                Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
329        }
330        self.create_materialized_view(table, graph, HashSet::new(), None, if_not_exists)
331            .await?;
332        Ok(())
333    }
334
335    async fn replace_table(
336        &self,
337        _source: Option<PbSource>,
338        mut table: PbTable,
339        _graph: StreamFragmentGraph,
340        _job_type: TableJobType,
341    ) -> Result<()> {
342        table.stream_job_status = PbStreamJobStatus::Created as _;
343        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
344        self.catalog.write().update_table(&table);
345        Ok(())
346    }
347
348    async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
349        self.catalog.write().update_source(&source);
350        Ok(())
351    }
352
353    async fn create_source(
354        &self,
355        source: PbSource,
356        _graph: Option<StreamFragmentGraph>,
357        _if_not_exists: bool,
358    ) -> Result<()> {
359        self.create_source_inner(source).map(|_| ())
360    }
361
362    async fn create_sink(
363        &self,
364        sink: PbSink,
365        graph: StreamFragmentGraph,
366        _affected_table_change: Option<ReplaceJobPlan>,
367        _dependencies: HashSet<ObjectId>,
368        _if_not_exists: bool,
369    ) -> Result<()> {
370        self.create_sink_inner(sink, graph)
371    }
372
373    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
374        self.create_subscription_inner(subscription)
375    }
376
377    async fn create_index(
378        &self,
379        mut index: PbIndex,
380        mut index_table: PbTable,
381        _graph: StreamFragmentGraph,
382        _if_not_exists: bool,
383    ) -> Result<()> {
384        index_table.id = self.gen_id();
385        index_table.stream_job_status = PbStreamJobStatus::Created as _;
386        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
387        self.catalog.write().create_table(&index_table);
388        self.add_table_or_index_id(
389            index_table.id,
390            index_table.schema_id,
391            index_table.database_id,
392        );
393
394        index.id = index_table.id;
395        index.index_table_id = index_table.id;
396        self.catalog.write().create_index(&index);
397        Ok(())
398    }
399
400    async fn create_function(&self, _function: PbFunction) -> Result<()> {
401        unreachable!()
402    }
403
404    async fn create_connection(
405        &self,
406        _connection_name: String,
407        _database_id: u32,
408        _schema_id: u32,
409        _owner_id: u32,
410        _connection: create_connection_request::Payload,
411    ) -> Result<()> {
412        unreachable!()
413    }
414
415    async fn create_secret(
416        &self,
417        _secret_name: String,
418        _database_id: u32,
419        _schema_id: u32,
420        _owner_id: u32,
421        _payload: Vec<u8>,
422    ) -> Result<()> {
423        unreachable!()
424    }
425
426    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
427        unreachable!()
428    }
429
430    async fn drop_table(
431        &self,
432        source_id: Option<u32>,
433        table_id: TableId,
434        cascade: bool,
435    ) -> Result<()> {
436        if cascade {
437            return Err(ErrorCode::NotSupported(
438                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
439                "use drop instead".to_owned(),
440            )
441            .into());
442        }
443        if let Some(source_id) = source_id {
444            self.drop_table_or_source_id(source_id);
445        }
446        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
447        let indexes =
448            self.catalog
449                .read()
450                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
451        for index in indexes {
452            self.drop_index(index.id, cascade).await?;
453        }
454        self.catalog
455            .write()
456            .drop_table(database_id, schema_id, table_id);
457        if let Some(source_id) = source_id {
458            self.catalog
459                .write()
460                .drop_source(database_id, schema_id, source_id);
461        }
462        Ok(())
463    }
464
465    async fn drop_view(&self, _view_id: u32, _cascade: bool) -> Result<()> {
466        unreachable!()
467    }
468
469    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
470        if cascade {
471            return Err(ErrorCode::NotSupported(
472                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
473                "use drop instead".to_owned(),
474            )
475            .into());
476        }
477        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
478        let indexes =
479            self.catalog
480                .read()
481                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
482        for index in indexes {
483            self.drop_index(index.id, cascade).await?;
484        }
485        self.catalog
486            .write()
487            .drop_table(database_id, schema_id, table_id);
488        Ok(())
489    }
490
491    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
492        if cascade {
493            return Err(ErrorCode::NotSupported(
494                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
495                "use drop instead".to_owned(),
496            )
497            .into());
498        }
499        let (database_id, schema_id) = self.drop_table_or_source_id(source_id);
500        self.catalog
501            .write()
502            .drop_source(database_id, schema_id, source_id);
503        Ok(())
504    }
505
506    async fn drop_sink(
507        &self,
508        sink_id: u32,
509        cascade: bool,
510        _target_table_change: Option<ReplaceJobPlan>,
511    ) -> Result<()> {
512        if cascade {
513            return Err(ErrorCode::NotSupported(
514                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
515                "use drop instead".to_owned(),
516            )
517            .into());
518        }
519        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id);
520        self.catalog
521            .write()
522            .drop_sink(database_id, schema_id, sink_id);
523        Ok(())
524    }
525
526    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
527        if cascade {
528            return Err(ErrorCode::NotSupported(
529                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
530                "use drop instead".to_owned(),
531            )
532            .into());
533        }
534        let (database_id, schema_id) = self.drop_table_or_subscription_id(subscription_id);
535        self.catalog
536            .write()
537            .drop_subscription(database_id, schema_id, subscription_id);
538        Ok(())
539    }
540
541    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
542        if cascade {
543            return Err(ErrorCode::NotSupported(
544                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
545                "use drop instead".to_owned(),
546            )
547            .into());
548        }
549        let &schema_id = self
550            .table_id_to_schema_id
551            .read()
552            .get(&index_id.index_id)
553            .unwrap();
554        let database_id = self.get_database_id_by_schema(schema_id);
555
556        let index = {
557            let catalog_reader = self.catalog.read();
558            let schema_catalog = catalog_reader
559                .get_schema_by_id(&database_id, &schema_id)
560                .unwrap();
561            schema_catalog.get_index_by_id(&index_id).unwrap().clone()
562        };
563
564        let index_table_id = index.index_table.id;
565        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.index_id);
566        self.catalog
567            .write()
568            .drop_index(database_id, schema_id, index_id);
569        self.catalog
570            .write()
571            .drop_table(database_id, schema_id, index_table_id);
572        Ok(())
573    }
574
575    async fn drop_function(&self, _function_id: FunctionId) -> Result<()> {
576        unreachable!()
577    }
578
579    async fn drop_connection(&self, _connection_id: ConnectionId) -> Result<()> {
580        unreachable!()
581    }
582
583    async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
584        unreachable!()
585    }
586
587    async fn drop_database(&self, database_id: u32) -> Result<()> {
588        self.catalog.write().drop_database(database_id);
589        Ok(())
590    }
591
592    async fn drop_schema(&self, schema_id: u32, _cascade: bool) -> Result<()> {
593        let database_id = self.drop_schema_id(schema_id);
594        self.catalog.write().drop_schema(database_id, schema_id);
595        Ok(())
596    }
597
598    async fn alter_name(
599        &self,
600        object_id: alter_name_request::Object,
601        object_name: &str,
602    ) -> Result<()> {
603        match object_id {
604            alter_name_request::Object::TableId(table_id) => {
605                self.catalog
606                    .write()
607                    .alter_table_name_by_id(&table_id.into(), object_name);
608                Ok(())
609            }
610            _ => {
611                unimplemented!()
612            }
613        }
614    }
615
616    async fn alter_source(&self, source: PbSource) -> Result<()> {
617        self.catalog.write().update_source(&source);
618        Ok(())
619    }
620
621    async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
622        for database in self.catalog.read().iter_databases() {
623            for schema in database.iter_schemas() {
624                match object {
625                    Object::TableId(table_id) => {
626                        if let Some(table) =
627                            schema.get_created_table_by_id(&TableId::from(table_id))
628                        {
629                            let mut pb_table = table.to_prost();
630                            pb_table.owner = owner_id;
631                            self.catalog.write().update_table(&pb_table);
632                            return Ok(());
633                        }
634                    }
635                    _ => unreachable!(),
636                }
637            }
638        }
639
640        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
641    }
642
643    async fn alter_set_schema(
644        &self,
645        object: alter_set_schema_request::Object,
646        new_schema_id: u32,
647    ) -> Result<()> {
648        match object {
649            alter_set_schema_request::Object::TableId(table_id) => {
650                let mut pb_table = {
651                    let reader = self.catalog.read();
652                    let table = reader.get_any_table_by_id(&table_id.into())?.to_owned();
653                    table.to_prost()
654                };
655                pb_table.schema_id = new_schema_id;
656                self.catalog.write().update_table(&pb_table);
657                self.table_id_to_schema_id
658                    .write()
659                    .insert(table_id, new_schema_id);
660                Ok(())
661            }
662            _ => unreachable!(),
663        }
664    }
665
666    async fn alter_parallelism(
667        &self,
668        _table_id: u32,
669        _parallelism: PbTableParallelism,
670        _deferred: bool,
671    ) -> Result<()> {
672        todo!()
673    }
674
675    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
676        todo!()
677    }
678
679    async fn alter_secret(
680        &self,
681        _secret_id: u32,
682        _secret_name: String,
683        _database_id: u32,
684        _schema_id: u32,
685        _owner_id: u32,
686        _payload: Vec<u8>,
687    ) -> Result<()> {
688        unreachable!()
689    }
690
691    async fn alter_resource_group(
692        &self,
693        _table_id: u32,
694        _resource_group: Option<String>,
695        _deferred: bool,
696    ) -> Result<()> {
697        todo!()
698    }
699
700    async fn alter_database_param(
701        &self,
702        database_id: u32,
703        param: AlterDatabaseParam,
704    ) -> Result<()> {
705        let mut pb_database = {
706            let reader = self.catalog.read();
707            let database = reader.get_database_by_id(&database_id)?.to_owned();
708            database.to_prost()
709        };
710        match param {
711            AlterDatabaseParam::BarrierIntervalMs(interval) => {
712                pb_database.barrier_interval_ms = interval;
713            }
714            AlterDatabaseParam::CheckpointFrequency(frequency) => {
715                pb_database.checkpoint_frequency = frequency;
716            }
717        }
718        self.catalog.write().update_database(&pb_database);
719        Ok(())
720    }
721}
722
723impl MockCatalogWriter {
724    pub fn new(
725        catalog: Arc<RwLock<Catalog>>,
726        hummock_snapshot_manager: HummockSnapshotManagerRef,
727    ) -> Self {
728        catalog.write().create_database(&PbDatabase {
729            id: 0,
730            name: DEFAULT_DATABASE_NAME.to_owned(),
731            owner: DEFAULT_SUPER_USER_ID,
732            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
733            barrier_interval_ms: None,
734            checkpoint_frequency: None,
735        });
736        catalog.write().create_schema(&PbSchema {
737            id: 1,
738            name: DEFAULT_SCHEMA_NAME.to_owned(),
739            database_id: 0,
740            owner: DEFAULT_SUPER_USER_ID,
741        });
742        catalog.write().create_schema(&PbSchema {
743            id: 2,
744            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
745            database_id: 0,
746            owner: DEFAULT_SUPER_USER_ID,
747        });
748        catalog.write().create_schema(&PbSchema {
749            id: 3,
750            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
751            database_id: 0,
752            owner: DEFAULT_SUPER_USER_ID,
753        });
754        let mut map: HashMap<u32, DatabaseId> = HashMap::new();
755        map.insert(1_u32, 0_u32);
756        map.insert(2_u32, 0_u32);
757        map.insert(3_u32, 0_u32);
758        Self {
759            catalog,
760            id: AtomicU32::new(3),
761            table_id_to_schema_id: Default::default(),
762            schema_id_to_database_id: RwLock::new(map),
763            hummock_snapshot_manager,
764        }
765    }
766
767    fn gen_id(&self) -> u32 {
768        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
769        self.id.fetch_add(1, Ordering::SeqCst) + 1
770    }
771
772    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
773        self.table_id_to_schema_id
774            .write()
775            .insert(table_id, schema_id);
776    }
777
778    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
779        let schema_id = self
780            .table_id_to_schema_id
781            .write()
782            .remove(&table_id)
783            .unwrap();
784        (self.get_database_id_by_schema(schema_id), schema_id)
785    }
786
787    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
788        self.table_id_to_schema_id
789            .write()
790            .insert(table_id, schema_id);
791    }
792
793    fn add_table_or_subscription_id(
794        &self,
795        table_id: u32,
796        schema_id: SchemaId,
797        _database_id: DatabaseId,
798    ) {
799        self.table_id_to_schema_id
800            .write()
801            .insert(table_id, schema_id);
802    }
803
804    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
805        self.table_id_to_schema_id
806            .write()
807            .insert(table_id, schema_id);
808    }
809
810    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
811        let schema_id = self
812            .table_id_to_schema_id
813            .write()
814            .remove(&table_id)
815            .unwrap();
816        (self.get_database_id_by_schema(schema_id), schema_id)
817    }
818
819    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
820        let schema_id = self
821            .table_id_to_schema_id
822            .write()
823            .remove(&table_id)
824            .unwrap();
825        (self.get_database_id_by_schema(schema_id), schema_id)
826    }
827
828    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
829        let schema_id = self
830            .table_id_to_schema_id
831            .write()
832            .remove(&table_id)
833            .unwrap();
834        (self.get_database_id_by_schema(schema_id), schema_id)
835    }
836
837    fn add_schema_id(&self, schema_id: u32, database_id: DatabaseId) {
838        self.schema_id_to_database_id
839            .write()
840            .insert(schema_id, database_id);
841    }
842
843    fn drop_schema_id(&self, schema_id: u32) -> DatabaseId {
844        self.schema_id_to_database_id
845            .write()
846            .remove(&schema_id)
847            .unwrap()
848    }
849
850    fn create_source_inner(&self, mut source: PbSource) -> Result<u32> {
851        source.id = self.gen_id();
852        self.catalog.write().create_source(&source);
853        self.add_table_or_source_id(source.id, source.schema_id, source.database_id);
854        Ok(source.id)
855    }
856
857    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
858        sink.id = self.gen_id();
859        self.catalog.write().create_sink(&sink);
860        self.add_table_or_sink_id(sink.id, sink.schema_id, sink.database_id);
861        Ok(())
862    }
863
864    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
865        subscription.id = self.gen_id();
866        self.catalog.write().create_subscription(&subscription);
867        self.add_table_or_subscription_id(
868            subscription.id,
869            subscription.schema_id,
870            subscription.database_id,
871        );
872        Ok(())
873    }
874
875    fn get_database_id_by_schema(&self, schema_id: u32) -> DatabaseId {
876        *self
877            .schema_id_to_database_id
878            .read()
879            .get(&schema_id)
880            .unwrap()
881    }
882}
883
884pub struct MockUserInfoWriter {
885    id: AtomicU32,
886    user_info: Arc<RwLock<UserInfoManager>>,
887}
888
889#[async_trait::async_trait]
890impl UserInfoWriter for MockUserInfoWriter {
891    async fn create_user(&self, user: UserInfo) -> Result<()> {
892        let mut user = user;
893        user.id = self.gen_id();
894        self.user_info.write().create_user(user);
895        Ok(())
896    }
897
898    async fn drop_user(&self, id: UserId) -> Result<()> {
899        self.user_info.write().drop_user(id);
900        Ok(())
901    }
902
903    async fn update_user(
904        &self,
905        update_user: UserInfo,
906        update_fields: Vec<UpdateField>,
907    ) -> Result<()> {
908        let mut lock = self.user_info.write();
909        let id = update_user.get_id();
910        let Some(old_name) = lock.get_user_name_by_id(id) else {
911            return Ok(());
912        };
913        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
914        update_fields.into_iter().for_each(|field| match field {
915            UpdateField::Super => user_info.is_super = update_user.is_super,
916            UpdateField::Login => user_info.can_login = update_user.can_login,
917            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
918            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
919            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
920            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
921            UpdateField::Unspecified => unreachable!(),
922        });
923        lock.update_user(update_user);
924        Ok(())
925    }
926
927    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
928    /// `GrantAllSources` when grant privilege to user.
929    async fn grant_privilege(
930        &self,
931        users: Vec<UserId>,
932        privileges: Vec<GrantPrivilege>,
933        with_grant_option: bool,
934        _grantor: UserId,
935    ) -> Result<()> {
936        let privileges = privileges
937            .into_iter()
938            .map(|mut p| {
939                p.action_with_opts
940                    .iter_mut()
941                    .for_each(|ao| ao.with_grant_option = with_grant_option);
942                p
943            })
944            .collect::<Vec<_>>();
945        for user_id in users {
946            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
947                u.extend_privileges(privileges.clone());
948            }
949        }
950        Ok(())
951    }
952
953    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
954    /// `RevokeAllSources` when revoke privilege from user.
955    async fn revoke_privilege(
956        &self,
957        users: Vec<UserId>,
958        privileges: Vec<GrantPrivilege>,
959        _granted_by: UserId,
960        _revoke_by: UserId,
961        revoke_grant_option: bool,
962        _cascade: bool,
963    ) -> Result<()> {
964        for user_id in users {
965            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
966                u.revoke_privileges(privileges.clone(), revoke_grant_option);
967            }
968        }
969        Ok(())
970    }
971}
972
973impl MockUserInfoWriter {
974    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
975        user_info.write().create_user(UserInfo {
976            id: DEFAULT_SUPER_USER_ID,
977            name: DEFAULT_SUPER_USER.to_owned(),
978            is_super: true,
979            can_create_db: true,
980            can_create_user: true,
981            can_login: true,
982            ..Default::default()
983        });
984        Self {
985            user_info,
986            id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
987        }
988    }
989
990    fn gen_id(&self) -> u32 {
991        self.id.fetch_add(1, Ordering::SeqCst)
992    }
993}
994
995pub struct MockFrontendMetaClient {}
996
997#[async_trait::async_trait]
998impl FrontendMetaClient for MockFrontendMetaClient {
999    async fn try_unregister(&self) {}
1000
1001    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1002        Ok(INVALID_VERSION_ID)
1003    }
1004
1005    async fn wait(&self) -> RpcResult<()> {
1006        Ok(())
1007    }
1008
1009    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1010        Ok(vec![])
1011    }
1012
1013    async fn list_table_fragments(
1014        &self,
1015        _table_ids: &[u32],
1016    ) -> RpcResult<HashMap<u32, TableFragmentInfo>> {
1017        Ok(HashMap::default())
1018    }
1019
1020    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1021        Ok(vec![])
1022    }
1023
1024    async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1025        Ok(vec![])
1026    }
1027
1028    async fn list_creating_stream_scan_fragment_distribution(
1029        &self,
1030    ) -> RpcResult<Vec<FragmentDistribution>> {
1031        Ok(vec![])
1032    }
1033
1034    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1035        Ok(vec![])
1036    }
1037
1038    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1039        Ok(vec![])
1040    }
1041
1042    async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1043        Ok(vec![])
1044    }
1045
1046    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1047        Ok(vec![])
1048    }
1049
1050    async fn get_system_params(&self) -> RpcResult<SystemParamsReader> {
1051        Ok(SystemParams::default().into())
1052    }
1053
1054    async fn set_system_param(
1055        &self,
1056        _param: String,
1057        _value: Option<String>,
1058    ) -> RpcResult<Option<SystemParamsReader>> {
1059        Ok(Some(SystemParams::default().into()))
1060    }
1061
1062    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1063        Ok(Default::default())
1064    }
1065
1066    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1067        Ok("".to_owned())
1068    }
1069
1070    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1071        Ok(vec![])
1072    }
1073
1074    async fn get_tables(
1075        &self,
1076        _table_ids: &[u32],
1077        _include_dropped_tables: bool,
1078    ) -> RpcResult<HashMap<u32, Table>> {
1079        Ok(HashMap::new())
1080    }
1081
1082    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(u32, u64)>> {
1083        unimplemented!()
1084    }
1085
1086    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1087        Ok(HummockVersion::default())
1088    }
1089
1090    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1091        unimplemented!()
1092    }
1093
1094    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1095        unimplemented!()
1096    }
1097
1098    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1099        unimplemented!()
1100    }
1101
1102    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1103        unimplemented!()
1104    }
1105
1106    async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1107        unimplemented!()
1108    }
1109
1110    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1111        unimplemented!()
1112    }
1113
1114    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1115        unimplemented!()
1116    }
1117
1118    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1119        unimplemented!()
1120    }
1121
1122    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1123        Ok(vec![])
1124    }
1125
1126    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1127        unimplemented!()
1128    }
1129
1130    async fn recover(&self) -> RpcResult<()> {
1131        unimplemented!()
1132    }
1133
1134    async fn apply_throttle(
1135        &self,
1136        _kind: PbThrottleTarget,
1137        _id: u32,
1138        _rate_limit: Option<u32>,
1139    ) -> RpcResult<()> {
1140        unimplemented!()
1141    }
1142
1143    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1144        Ok(RecoveryStatus::StatusRunning)
1145    }
1146
1147    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1148        Ok(vec![])
1149    }
1150
1151    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1152        Ok(vec![])
1153    }
1154
1155    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1156        unimplemented!()
1157    }
1158
1159    async fn alter_sink_props(
1160        &self,
1161        _sink_id: u32,
1162        _changed_props: BTreeMap<String, String>,
1163        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1164        _connector_conn_ref: Option<u32>,
1165    ) -> RpcResult<()> {
1166        unimplemented!()
1167    }
1168
1169    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1170        unimplemented!()
1171    }
1172
1173    async fn get_fragment_by_id(
1174        &self,
1175        _fragment_id: u32,
1176    ) -> RpcResult<Option<FragmentDistribution>> {
1177        unimplemented!()
1178    }
1179
1180    fn worker_id(&self) -> u32 {
1181        0
1182    }
1183}
1184
1185#[cfg(test)]
1186pub static PROTO_FILE_DATA: &str = r#"
1187    syntax = "proto3";
1188    package test;
1189    message TestRecord {
1190      int32 id = 1;
1191      Country country = 3;
1192      int64 zipcode = 4;
1193      float rate = 5;
1194    }
1195    message TestRecordAlterType {
1196        string id = 1;
1197        Country country = 3;
1198        int32 zipcode = 4;
1199        float rate = 5;
1200      }
1201    message TestRecordExt {
1202      int32 id = 1;
1203      Country country = 3;
1204      int64 zipcode = 4;
1205      float rate = 5;
1206      string name = 6;
1207    }
1208    message Country {
1209      string address = 1;
1210      City city = 2;
1211      string zipcode = 3;
1212    }
1213    message City {
1214      string address = 1;
1215      string zipcode = 2;
1216    }"#;
1217
1218/// Returns the file.
1219/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1220pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1221    let in_file = Builder::new()
1222        .prefix("temp")
1223        .suffix(".proto")
1224        .rand_bytes(8)
1225        .tempfile()
1226        .unwrap();
1227
1228    let out_file = Builder::new()
1229        .prefix("temp")
1230        .suffix(".pb")
1231        .rand_bytes(8)
1232        .tempfile()
1233        .unwrap();
1234
1235    let mut file = in_file.as_file();
1236    file.write_all(proto_data.as_ref())
1237        .expect("writing binary to test file");
1238    file.flush().expect("flush temp file failed");
1239    let include_path = in_file
1240        .path()
1241        .parent()
1242        .unwrap()
1243        .to_string_lossy()
1244        .into_owned();
1245    let out_path = out_file.path().to_string_lossy().into_owned();
1246    let in_path = in_file.path().to_string_lossy().into_owned();
1247    let mut compile = std::process::Command::new("protoc");
1248
1249    let out = compile
1250        .arg("--include_imports")
1251        .arg("-I")
1252        .arg(include_path)
1253        .arg(format!("--descriptor_set_out={}", out_path))
1254        .arg(in_path)
1255        .output()
1256        .expect("failed to compile proto");
1257    if !out.status.success() {
1258        panic!("compile proto failed \n output: {:?}", out);
1259    }
1260    out_file
1261}