risingwave_frontend/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29    DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30    FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31    RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::util::cluster_limit::ClusterLimit;
38use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
39use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
40use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
41use risingwave_pb::backup_service::MetaSnapshotMetadata;
42use risingwave_pb::catalog::{
43    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44    PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
49use risingwave_pb::ddl_service::{
50    DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
51    alter_swap_rename_request, create_connection_request,
52};
53use risingwave_pb::hummock::write_limits::WriteLimit;
54use risingwave_pb::hummock::{
55    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
56};
57use risingwave_pb::id::ActorId;
58use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
59use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
60use risingwave_pb::meta::list_actor_states_response::ActorState;
61use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
62use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
63use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
64use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
65use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
66use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
67use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
68use risingwave_pb::meta::{
69    EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
70    RefreshRequest, RefreshResponse, SystemParams,
71};
72use risingwave_pb::secret::PbSecretRef;
73use risingwave_pb::stream_plan::StreamFragmentGraph;
74use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
75use risingwave_pb::user::update_user_request::UpdateField;
76use risingwave_pb::user::{GrantPrivilege, UserInfo};
77use risingwave_rpc_client::error::Result as RpcResult;
78use tempfile::{Builder, NamedTempFile};
79
80use crate::FrontendOpts;
81use crate::catalog::catalog_service::CatalogWriter;
82use crate::catalog::root_catalog::Catalog;
83use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
84use crate::error::{ErrorCode, Result, RwError};
85use crate::handler::RwPgResponse;
86use crate::meta_client::FrontendMetaClient;
87use crate::scheduler::HummockSnapshotManagerRef;
88use crate::session::{AuthContext, FrontendEnv, SessionImpl};
89use crate::user::UserId;
90use crate::user::user_manager::UserInfoManager;
91use crate::user::user_service::UserInfoWriter;
92
93/// An embedded frontend without starting meta and without starting frontend as a tcp server.
94pub struct LocalFrontend {
95    pub opts: FrontendOpts,
96    env: FrontendEnv,
97}
98
99impl SessionManager for LocalFrontend {
100    type Error = RwError;
101    type Session = SessionImpl;
102
103    fn create_dummy_session(
104        &self,
105        _database_id: DatabaseId,
106        _user_name: u32,
107    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
108        unreachable!()
109    }
110
111    fn connect(
112        &self,
113        _database: &str,
114        _user_name: &str,
115        _peer_addr: AddressRef,
116    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
117        Ok(self.session_ref())
118    }
119
120    fn cancel_queries_in_session(&self, _session_id: SessionId) {
121        unreachable!()
122    }
123
124    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
125        unreachable!()
126    }
127
128    fn end_session(&self, _session: &Self::Session) {
129        unreachable!()
130    }
131}
132
133impl LocalFrontend {
134    #[expect(clippy::unused_async)]
135    pub async fn new(opts: FrontendOpts) -> Self {
136        let env = FrontendEnv::mock();
137        Self { opts, env }
138    }
139
140    pub async fn run_sql(
141        &self,
142        sql: impl Into<String>,
143    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
144        let sql: Arc<str> = Arc::from(sql.into());
145        self.session_ref().run_statement(sql, vec![]).await
146    }
147
148    pub async fn run_sql_with_session(
149        &self,
150        session_ref: Arc<SessionImpl>,
151        sql: impl Into<String>,
152    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
153        let sql: Arc<str> = Arc::from(sql.into());
154        session_ref.run_statement(sql, vec![]).await
155    }
156
157    pub async fn run_user_sql(
158        &self,
159        sql: impl Into<String>,
160        database: String,
161        user_name: String,
162        user_id: UserId,
163    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
164        let sql: Arc<str> = Arc::from(sql.into());
165        self.session_user_ref(database, user_name, user_id)
166            .run_statement(sql, vec![])
167            .await
168    }
169
170    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
171        let mut rsp = self.run_sql(sql).await.unwrap();
172        let mut res = vec![];
173        #[for_await]
174        for row_set in rsp.values_stream() {
175            for row in row_set.unwrap() {
176                res.push(format!("{:?}", row));
177            }
178        }
179        res
180    }
181
182    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
183        let mut rsp = self.run_sql(sql).await.unwrap();
184        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
185        let mut res = String::new();
186        #[for_await]
187        for row_set in rsp.values_stream() {
188            for row in row_set.unwrap() {
189                let row: Row = row;
190                let row = row.values()[0].as_ref().unwrap();
191                res += std::str::from_utf8(row).unwrap();
192                res += "\n";
193            }
194        }
195        res
196    }
197
198    /// Creates a new session
199    pub fn session_ref(&self) -> Arc<SessionImpl> {
200        self.session_user_ref(
201            DEFAULT_DATABASE_NAME.to_owned(),
202            DEFAULT_SUPER_USER.to_owned(),
203            DEFAULT_SUPER_USER_ID,
204        )
205    }
206
207    pub fn session_user_ref(
208        &self,
209        database: String,
210        user_name: String,
211        user_id: UserId,
212    ) -> Arc<SessionImpl> {
213        Arc::new(SessionImpl::new(
214            self.env.clone(),
215            AuthContext::new(database, user_name, user_id),
216            UserAuthenticator::None,
217            // Local Frontend use a non-sense id.
218            (0, 0),
219            Address::Tcp(SocketAddr::new(
220                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
221                6666,
222            ))
223            .into(),
224            Default::default(),
225        ))
226    }
227}
228
229pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
230    if rsp.stmt_type() != StatementType::EXPLAIN {
231        panic!("RESPONSE INVALID: {rsp:?}");
232    }
233    let mut res = String::new();
234    #[for_await]
235    for row_set in rsp.values_stream() {
236        for row in row_set.unwrap() {
237            let row: Row = row;
238            let row = row.values()[0].as_ref().unwrap();
239            res += std::str::from_utf8(row).unwrap();
240            res += "\n";
241        }
242    }
243    res
244}
245
246pub struct MockCatalogWriter {
247    catalog: Arc<RwLock<Catalog>>,
248    id: AtomicU32,
249    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
250    schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
251    hummock_snapshot_manager: HummockSnapshotManagerRef,
252}
253
254#[async_trait::async_trait]
255impl CatalogWriter for MockCatalogWriter {
256    async fn create_database(
257        &self,
258        db_name: &str,
259        owner: UserId,
260        resource_group: &str,
261        barrier_interval_ms: Option<u32>,
262        checkpoint_frequency: Option<u64>,
263    ) -> Result<()> {
264        let database_id = DatabaseId::new(self.gen_id());
265        self.catalog.write().create_database(&PbDatabase {
266            name: db_name.to_owned(),
267            id: database_id,
268            owner,
269            resource_group: resource_group.to_owned(),
270            barrier_interval_ms,
271            checkpoint_frequency,
272        });
273        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
274            .await?;
275        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
276            .await?;
277        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
278            .await?;
279        Ok(())
280    }
281
282    async fn create_schema(
283        &self,
284        db_id: DatabaseId,
285        schema_name: &str,
286        owner: UserId,
287    ) -> Result<()> {
288        let id = self.gen_id();
289        self.catalog.write().create_schema(&PbSchema {
290            id,
291            name: schema_name.to_owned(),
292            database_id: db_id,
293            owner,
294        });
295        self.add_schema_id(id, db_id);
296        Ok(())
297    }
298
299    async fn create_materialized_view(
300        &self,
301        mut table: PbTable,
302        _graph: StreamFragmentGraph,
303        _dependencies: HashSet<ObjectId>,
304        _specific_resource_group: Option<String>,
305        _if_not_exists: bool,
306    ) -> Result<()> {
307        table.id = self.gen_id();
308        table.stream_job_status = PbStreamJobStatus::Created as _;
309        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
310        self.catalog.write().create_table(&table);
311        self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
312        self.hummock_snapshot_manager.add_table_for_test(table.id);
313        Ok(())
314    }
315
316    async fn replace_materialized_view(
317        &self,
318        mut table: PbTable,
319        _graph: StreamFragmentGraph,
320    ) -> Result<()> {
321        table.stream_job_status = PbStreamJobStatus::Created as _;
322        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
323        self.catalog.write().update_table(&table);
324        Ok(())
325    }
326
327    async fn create_view(&self, mut view: PbView, _dependencies: HashSet<ObjectId>) -> Result<()> {
328        view.id = self.gen_id();
329        self.catalog.write().create_view(&view);
330        self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
331        Ok(())
332    }
333
334    async fn create_table(
335        &self,
336        source: Option<PbSource>,
337        mut table: PbTable,
338        graph: StreamFragmentGraph,
339        _job_type: PbTableJobType,
340        if_not_exists: bool,
341        _dependencies: HashSet<ObjectId>,
342    ) -> Result<()> {
343        if let Some(source) = source {
344            let source_id = self.create_source_inner(source)?;
345            table.optional_associated_source_id = Some(source_id.into());
346        }
347        self.create_materialized_view(table, graph, HashSet::new(), None, if_not_exists)
348            .await?;
349        Ok(())
350    }
351
352    async fn replace_table(
353        &self,
354        _source: Option<PbSource>,
355        mut table: PbTable,
356        _graph: StreamFragmentGraph,
357        _job_type: TableJobType,
358    ) -> Result<()> {
359        table.stream_job_status = PbStreamJobStatus::Created as _;
360        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
361        self.catalog.write().update_table(&table);
362        Ok(())
363    }
364
365    async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
366        self.catalog.write().update_source(&source);
367        Ok(())
368    }
369
370    async fn create_source(
371        &self,
372        source: PbSource,
373        _graph: Option<StreamFragmentGraph>,
374        _if_not_exists: bool,
375    ) -> Result<()> {
376        self.create_source_inner(source).map(|_| ())
377    }
378
379    async fn create_sink(
380        &self,
381        sink: PbSink,
382        graph: StreamFragmentGraph,
383        _dependencies: HashSet<ObjectId>,
384        _if_not_exists: bool,
385    ) -> Result<()> {
386        self.create_sink_inner(sink, graph)
387    }
388
389    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
390        self.create_subscription_inner(subscription)
391    }
392
393    async fn create_index(
394        &self,
395        mut index: PbIndex,
396        mut index_table: PbTable,
397        _graph: StreamFragmentGraph,
398        _if_not_exists: bool,
399    ) -> Result<()> {
400        index_table.id = self.gen_id();
401        index_table.stream_job_status = PbStreamJobStatus::Created as _;
402        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
403        self.catalog.write().create_table(&index_table);
404        self.add_table_or_index_id(
405            index_table.id.as_raw_id(),
406            index_table.schema_id,
407            index_table.database_id,
408        );
409
410        index.id = index_table.id.as_raw_id().into();
411        index.index_table_id = index_table.id;
412        self.catalog.write().create_index(&index);
413        Ok(())
414    }
415
416    async fn create_function(&self, _function: PbFunction) -> Result<()> {
417        unreachable!()
418    }
419
420    async fn create_connection(
421        &self,
422        _connection_name: String,
423        _database_id: DatabaseId,
424        _schema_id: SchemaId,
425        _owner_id: u32,
426        _connection: create_connection_request::Payload,
427    ) -> Result<()> {
428        unreachable!()
429    }
430
431    async fn create_secret(
432        &self,
433        _secret_name: String,
434        _database_id: DatabaseId,
435        _schema_id: SchemaId,
436        _owner_id: u32,
437        _payload: Vec<u8>,
438    ) -> Result<()> {
439        unreachable!()
440    }
441
442    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
443        unreachable!()
444    }
445
446    async fn drop_table(
447        &self,
448        source_id: Option<u32>,
449        table_id: TableId,
450        cascade: bool,
451    ) -> Result<()> {
452        if cascade {
453            return Err(ErrorCode::NotSupported(
454                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
455                "use drop instead".to_owned(),
456            )
457            .into());
458        }
459        if let Some(source_id) = source_id {
460            self.drop_table_or_source_id(source_id);
461        }
462        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
463        let indexes =
464            self.catalog
465                .read()
466                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
467        for index in indexes {
468            self.drop_index(index.id, cascade).await?;
469        }
470        self.catalog
471            .write()
472            .drop_table(database_id, schema_id, table_id);
473        if let Some(source_id) = source_id {
474            self.catalog
475                .write()
476                .drop_source(database_id, schema_id, source_id.into());
477        }
478        Ok(())
479    }
480
481    async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
482        unreachable!()
483    }
484
485    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
486        if cascade {
487            return Err(ErrorCode::NotSupported(
488                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
489                "use drop instead".to_owned(),
490            )
491            .into());
492        }
493        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
494        let indexes =
495            self.catalog
496                .read()
497                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
498        for index in indexes {
499            self.drop_index(index.id, cascade).await?;
500        }
501        self.catalog
502            .write()
503            .drop_table(database_id, schema_id, table_id);
504        Ok(())
505    }
506
507    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
508        if cascade {
509            return Err(ErrorCode::NotSupported(
510                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
511                "use drop instead".to_owned(),
512            )
513            .into());
514        }
515        let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
516        self.catalog
517            .write()
518            .drop_source(database_id, schema_id, source_id);
519        Ok(())
520    }
521
522    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
523        if cascade {
524            return Err(ErrorCode::NotSupported(
525                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
526                "use drop instead".to_owned(),
527            )
528            .into());
529        }
530        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
531        self.catalog
532            .write()
533            .drop_sink(database_id, schema_id, sink_id);
534        Ok(())
535    }
536
537    async fn drop_subscription(
538        &self,
539        subscription_id: SubscriptionId,
540        cascade: bool,
541    ) -> Result<()> {
542        if cascade {
543            return Err(ErrorCode::NotSupported(
544                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
545                "use drop instead".to_owned(),
546            )
547            .into());
548        }
549        let (database_id, schema_id) =
550            self.drop_table_or_subscription_id(subscription_id.as_raw_id());
551        self.catalog
552            .write()
553            .drop_subscription(database_id, schema_id, subscription_id);
554        Ok(())
555    }
556
557    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
558        if cascade {
559            return Err(ErrorCode::NotSupported(
560                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
561                "use drop instead".to_owned(),
562            )
563            .into());
564        }
565        let &schema_id = self
566            .table_id_to_schema_id
567            .read()
568            .get(&index_id.as_raw_id())
569            .unwrap();
570        let database_id = self.get_database_id_by_schema(schema_id);
571
572        let index = {
573            let catalog_reader = self.catalog.read();
574            let schema_catalog = catalog_reader
575                .get_schema_by_id(database_id, schema_id)
576                .unwrap();
577            schema_catalog.get_index_by_id(index_id).unwrap().clone()
578        };
579
580        let index_table_id = index.index_table().id;
581        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
582        self.catalog
583            .write()
584            .drop_index(database_id, schema_id, index_id);
585        self.catalog
586            .write()
587            .drop_table(database_id, schema_id, index_table_id);
588        Ok(())
589    }
590
591    async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
592        unreachable!()
593    }
594
595    async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
596        unreachable!()
597    }
598
599    async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
600        unreachable!()
601    }
602
603    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
604        self.catalog.write().drop_database(database_id);
605        Ok(())
606    }
607
608    async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
609        let database_id = self.drop_schema_id(schema_id);
610        self.catalog.write().drop_schema(database_id, schema_id);
611        Ok(())
612    }
613
614    async fn alter_name(
615        &self,
616        object_id: alter_name_request::Object,
617        object_name: &str,
618    ) -> Result<()> {
619        match object_id {
620            alter_name_request::Object::TableId(table_id) => {
621                self.catalog
622                    .write()
623                    .alter_table_name_by_id(table_id.into(), object_name);
624                Ok(())
625            }
626            _ => {
627                unimplemented!()
628            }
629        }
630    }
631
632    async fn alter_source(&self, source: PbSource) -> Result<()> {
633        self.catalog.write().update_source(&source);
634        Ok(())
635    }
636
637    async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
638        for database in self.catalog.read().iter_databases() {
639            for schema in database.iter_schemas() {
640                match object {
641                    Object::TableId(table_id) => {
642                        if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
643                        {
644                            let mut pb_table = table.to_prost();
645                            pb_table.owner = owner_id;
646                            self.catalog.write().update_table(&pb_table);
647                            return Ok(());
648                        }
649                    }
650                    _ => unreachable!(),
651                }
652            }
653        }
654
655        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
656    }
657
658    async fn alter_set_schema(
659        &self,
660        object: alter_set_schema_request::Object,
661        new_schema_id: SchemaId,
662    ) -> Result<()> {
663        match object {
664            alter_set_schema_request::Object::TableId(table_id) => {
665                let mut pb_table = {
666                    let reader = self.catalog.read();
667                    let table = reader.get_any_table_by_id(table_id.into())?.to_owned();
668                    table.to_prost()
669                };
670                pb_table.schema_id = new_schema_id;
671                self.catalog.write().update_table(&pb_table);
672                self.table_id_to_schema_id
673                    .write()
674                    .insert(table_id, new_schema_id);
675                Ok(())
676            }
677            _ => unreachable!(),
678        }
679    }
680
681    async fn alter_parallelism(
682        &self,
683        _job_id: JobId,
684        _parallelism: PbTableParallelism,
685        _deferred: bool,
686    ) -> Result<()> {
687        todo!()
688    }
689
690    async fn alter_config(
691        &self,
692        _job_id: JobId,
693        _entries_to_add: HashMap<String, String>,
694        _keys_to_remove: Vec<String>,
695    ) -> Result<()> {
696        todo!()
697    }
698
699    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
700        todo!()
701    }
702
703    async fn alter_secret(
704        &self,
705        _secret_id: SecretId,
706        _secret_name: String,
707        _database_id: DatabaseId,
708        _schema_id: SchemaId,
709        _owner_id: u32,
710        _payload: Vec<u8>,
711    ) -> Result<()> {
712        unreachable!()
713    }
714
715    async fn alter_resource_group(
716        &self,
717        _table_id: TableId,
718        _resource_group: Option<String>,
719        _deferred: bool,
720    ) -> Result<()> {
721        todo!()
722    }
723
724    async fn alter_database_param(
725        &self,
726        database_id: DatabaseId,
727        param: AlterDatabaseParam,
728    ) -> Result<()> {
729        let mut pb_database = {
730            let reader = self.catalog.read();
731            let database = reader.get_database_by_id(database_id)?.to_owned();
732            database.to_prost()
733        };
734        match param {
735            AlterDatabaseParam::BarrierIntervalMs(interval) => {
736                pb_database.barrier_interval_ms = interval;
737            }
738            AlterDatabaseParam::CheckpointFrequency(frequency) => {
739                pb_database.checkpoint_frequency = frequency;
740            }
741        }
742        self.catalog.write().update_database(&pb_database);
743        Ok(())
744    }
745
746    async fn create_iceberg_table(
747        &self,
748        _table_job_info: PbTableJobInfo,
749        _sink_job_info: PbSinkJobInfo,
750        _iceberg_source: PbSource,
751        _if_not_exists: bool,
752    ) -> Result<()> {
753        todo!()
754    }
755}
756
757impl MockCatalogWriter {
758    pub fn new(
759        catalog: Arc<RwLock<Catalog>>,
760        hummock_snapshot_manager: HummockSnapshotManagerRef,
761    ) -> Self {
762        catalog.write().create_database(&PbDatabase {
763            id: 0.into(),
764            name: DEFAULT_DATABASE_NAME.to_owned(),
765            owner: DEFAULT_SUPER_USER_ID,
766            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
767            barrier_interval_ms: None,
768            checkpoint_frequency: None,
769        });
770        catalog.write().create_schema(&PbSchema {
771            id: 1.into(),
772            name: DEFAULT_SCHEMA_NAME.to_owned(),
773            database_id: 0.into(),
774            owner: DEFAULT_SUPER_USER_ID,
775        });
776        catalog.write().create_schema(&PbSchema {
777            id: 2.into(),
778            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
779            database_id: 0.into(),
780            owner: DEFAULT_SUPER_USER_ID,
781        });
782        catalog.write().create_schema(&PbSchema {
783            id: 3.into(),
784            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
785            database_id: 0.into(),
786            owner: DEFAULT_SUPER_USER_ID,
787        });
788        let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
789        map.insert(1_u32.into(), 0_u32.into());
790        map.insert(2_u32.into(), 0_u32.into());
791        map.insert(3_u32.into(), 0_u32.into());
792        Self {
793            catalog,
794            id: AtomicU32::new(3),
795            table_id_to_schema_id: Default::default(),
796            schema_id_to_database_id: RwLock::new(map),
797            hummock_snapshot_manager,
798        }
799    }
800
801    fn gen_id<T: From<u32>>(&self) -> T {
802        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
803        (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
804    }
805
806    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
807        self.table_id_to_schema_id
808            .write()
809            .insert(table_id, schema_id);
810    }
811
812    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
813        let schema_id = self
814            .table_id_to_schema_id
815            .write()
816            .remove(&table_id)
817            .unwrap();
818        (self.get_database_id_by_schema(schema_id), schema_id)
819    }
820
821    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
822        self.table_id_to_schema_id
823            .write()
824            .insert(table_id, schema_id);
825    }
826
827    fn add_table_or_subscription_id(
828        &self,
829        table_id: u32,
830        schema_id: SchemaId,
831        _database_id: DatabaseId,
832    ) {
833        self.table_id_to_schema_id
834            .write()
835            .insert(table_id, schema_id);
836    }
837
838    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
839        self.table_id_to_schema_id
840            .write()
841            .insert(table_id, schema_id);
842    }
843
844    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
845        let schema_id = self
846            .table_id_to_schema_id
847            .write()
848            .remove(&table_id)
849            .unwrap();
850        (self.get_database_id_by_schema(schema_id), schema_id)
851    }
852
853    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
854        let schema_id = self
855            .table_id_to_schema_id
856            .write()
857            .remove(&table_id)
858            .unwrap();
859        (self.get_database_id_by_schema(schema_id), schema_id)
860    }
861
862    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
863        let schema_id = self
864            .table_id_to_schema_id
865            .write()
866            .remove(&table_id)
867            .unwrap();
868        (self.get_database_id_by_schema(schema_id), schema_id)
869    }
870
871    fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
872        self.schema_id_to_database_id
873            .write()
874            .insert(schema_id, database_id);
875    }
876
877    fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
878        self.schema_id_to_database_id
879            .write()
880            .remove(&schema_id)
881            .unwrap()
882    }
883
884    fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
885        source.id = self.gen_id();
886        self.catalog.write().create_source(&source);
887        self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
888        Ok(source.id)
889    }
890
891    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
892        sink.id = self.gen_id();
893        sink.stream_job_status = PbStreamJobStatus::Created as _;
894        self.catalog.write().create_sink(&sink);
895        self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
896        Ok(())
897    }
898
899    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
900        subscription.id = self.gen_id();
901        self.catalog.write().create_subscription(&subscription);
902        self.add_table_or_subscription_id(
903            subscription.id.as_raw_id(),
904            subscription.schema_id,
905            subscription.database_id,
906        );
907        Ok(())
908    }
909
910    fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
911        *self
912            .schema_id_to_database_id
913            .read()
914            .get(&schema_id)
915            .unwrap()
916    }
917}
918
919pub struct MockUserInfoWriter {
920    id: AtomicU32,
921    user_info: Arc<RwLock<UserInfoManager>>,
922}
923
924#[async_trait::async_trait]
925impl UserInfoWriter for MockUserInfoWriter {
926    async fn create_user(&self, user: UserInfo) -> Result<()> {
927        let mut user = user;
928        user.id = self.gen_id();
929        self.user_info.write().create_user(user);
930        Ok(())
931    }
932
933    async fn drop_user(&self, id: UserId) -> Result<()> {
934        self.user_info.write().drop_user(id);
935        Ok(())
936    }
937
938    async fn update_user(
939        &self,
940        update_user: UserInfo,
941        update_fields: Vec<UpdateField>,
942    ) -> Result<()> {
943        let mut lock = self.user_info.write();
944        let id = update_user.get_id();
945        let Some(old_name) = lock.get_user_name_by_id(id) else {
946            return Ok(());
947        };
948        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
949        update_fields.into_iter().for_each(|field| match field {
950            UpdateField::Super => user_info.is_super = update_user.is_super,
951            UpdateField::Login => user_info.can_login = update_user.can_login,
952            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
953            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
954            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
955            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
956            UpdateField::Admin => user_info.is_admin = update_user.is_admin,
957            UpdateField::Unspecified => unreachable!(),
958        });
959        lock.update_user(update_user);
960        Ok(())
961    }
962
963    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
964    /// `GrantAllSources` when grant privilege to user.
965    async fn grant_privilege(
966        &self,
967        users: Vec<UserId>,
968        privileges: Vec<GrantPrivilege>,
969        with_grant_option: bool,
970        _grantor: UserId,
971    ) -> Result<()> {
972        let privileges = privileges
973            .into_iter()
974            .map(|mut p| {
975                p.action_with_opts
976                    .iter_mut()
977                    .for_each(|ao| ao.with_grant_option = with_grant_option);
978                p
979            })
980            .collect::<Vec<_>>();
981        for user_id in users {
982            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
983                u.extend_privileges(privileges.clone());
984            }
985        }
986        Ok(())
987    }
988
989    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
990    /// `RevokeAllSources` when revoke privilege from user.
991    async fn revoke_privilege(
992        &self,
993        users: Vec<UserId>,
994        privileges: Vec<GrantPrivilege>,
995        _granted_by: UserId,
996        _revoke_by: UserId,
997        revoke_grant_option: bool,
998        _cascade: bool,
999    ) -> Result<()> {
1000        for user_id in users {
1001            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1002                u.revoke_privileges(privileges.clone(), revoke_grant_option);
1003            }
1004        }
1005        Ok(())
1006    }
1007
1008    async fn alter_default_privilege(
1009        &self,
1010        _users: Vec<UserId>,
1011        _database_id: DatabaseId,
1012        _schemas: Vec<SchemaId>,
1013        _operation: AlterDefaultPrivilegeOperation,
1014        _operated_by: UserId,
1015    ) -> Result<()> {
1016        todo!()
1017    }
1018}
1019
1020impl MockUserInfoWriter {
1021    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1022        user_info.write().create_user(UserInfo {
1023            id: DEFAULT_SUPER_USER_ID,
1024            name: DEFAULT_SUPER_USER.to_owned(),
1025            is_super: true,
1026            can_create_db: true,
1027            can_create_user: true,
1028            can_login: true,
1029            ..Default::default()
1030        });
1031        user_info.write().create_user(UserInfo {
1032            id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1033            name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1034            is_super: true,
1035            can_create_db: true,
1036            can_create_user: true,
1037            can_login: true,
1038            is_admin: true,
1039            ..Default::default()
1040        });
1041        Self {
1042            user_info,
1043            id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
1044        }
1045    }
1046
1047    fn gen_id(&self) -> u32 {
1048        self.id.fetch_add(1, Ordering::SeqCst)
1049    }
1050}
1051
1052pub struct MockFrontendMetaClient {}
1053
1054#[async_trait::async_trait]
1055impl FrontendMetaClient for MockFrontendMetaClient {
1056    async fn try_unregister(&self) {}
1057
1058    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1059        Ok(INVALID_VERSION_ID)
1060    }
1061
1062    async fn wait(&self) -> RpcResult<()> {
1063        Ok(())
1064    }
1065
1066    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1067        Ok(vec![])
1068    }
1069
1070    async fn list_table_fragments(
1071        &self,
1072        _table_ids: &[JobId],
1073    ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1074        Ok(HashMap::default())
1075    }
1076
1077    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1078        Ok(vec![])
1079    }
1080
1081    async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1082        Ok(vec![])
1083    }
1084
1085    async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1086        Ok(vec![])
1087    }
1088
1089    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1090        Ok(vec![])
1091    }
1092
1093    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1094        Ok(vec![])
1095    }
1096
1097    async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1098        Ok(vec![])
1099    }
1100
1101    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1102        Ok(vec![])
1103    }
1104
1105    async fn set_system_param(
1106        &self,
1107        _param: String,
1108        _value: Option<String>,
1109    ) -> RpcResult<Option<SystemParamsReader>> {
1110        Ok(Some(SystemParams::default().into()))
1111    }
1112
1113    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1114        Ok(Default::default())
1115    }
1116
1117    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1118        Ok("".to_owned())
1119    }
1120
1121    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1122        Ok(vec![])
1123    }
1124
1125    async fn get_tables(
1126        &self,
1127        _table_ids: Vec<crate::catalog::TableId>,
1128        _include_dropped_tables: bool,
1129    ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1130        Ok(HashMap::new())
1131    }
1132
1133    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, u64)>> {
1134        unimplemented!()
1135    }
1136
1137    async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1138        unimplemented!()
1139    }
1140
1141    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1142        Ok(HummockVersion::default())
1143    }
1144
1145    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1146        unimplemented!()
1147    }
1148
1149    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1150        unimplemented!()
1151    }
1152
1153    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1154        unimplemented!()
1155    }
1156
1157    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1158        unimplemented!()
1159    }
1160
1161    async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1162        unimplemented!()
1163    }
1164
1165    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1166        unimplemented!()
1167    }
1168
1169    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1170        Ok(vec![])
1171    }
1172
1173    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1174        unimplemented!()
1175    }
1176
1177    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1178        Ok(vec![])
1179    }
1180
1181    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1182        unimplemented!()
1183    }
1184
1185    async fn recover(&self) -> RpcResult<()> {
1186        unimplemented!()
1187    }
1188
1189    async fn apply_throttle(
1190        &self,
1191        _kind: PbThrottleTarget,
1192        _id: u32,
1193        _rate_limit: Option<u32>,
1194    ) -> RpcResult<()> {
1195        unimplemented!()
1196    }
1197
1198    async fn alter_fragment_parallelism(
1199        &self,
1200        _fragment_ids: Vec<FragmentId>,
1201        _parallelism: Option<PbTableParallelism>,
1202    ) -> RpcResult<()> {
1203        unimplemented!()
1204    }
1205
1206    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1207        Ok(RecoveryStatus::StatusRunning)
1208    }
1209
1210    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1211        Ok(vec![])
1212    }
1213
1214    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1215        Ok(vec![])
1216    }
1217
1218    async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1219        Ok(HashMap::default())
1220    }
1221
1222    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1223        unimplemented!()
1224    }
1225
1226    async fn alter_sink_props(
1227        &self,
1228        _sink_id: SinkId,
1229        _changed_props: BTreeMap<String, String>,
1230        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1231        _connector_conn_ref: Option<ConnectionId>,
1232    ) -> RpcResult<()> {
1233        unimplemented!()
1234    }
1235
1236    async fn alter_iceberg_table_props(
1237        &self,
1238        _table_id: TableId,
1239        _sink_id: SinkId,
1240        _source_id: SourceId,
1241        _changed_props: BTreeMap<String, String>,
1242        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1243        _connector_conn_ref: Option<ConnectionId>,
1244    ) -> RpcResult<()> {
1245        unimplemented!()
1246    }
1247
1248    async fn alter_source_connector_props(
1249        &self,
1250        _source_id: SourceId,
1251        _changed_props: BTreeMap<String, String>,
1252        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1253        _connector_conn_ref: Option<ConnectionId>,
1254    ) -> RpcResult<()> {
1255        unimplemented!()
1256    }
1257
1258    async fn alter_connection_connector_props(
1259        &self,
1260        _connection_id: u32,
1261        _changed_props: BTreeMap<String, String>,
1262        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1263    ) -> RpcResult<()> {
1264        Ok(())
1265    }
1266
1267    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1268        unimplemented!()
1269    }
1270
1271    async fn get_fragment_by_id(
1272        &self,
1273        _fragment_id: FragmentId,
1274    ) -> RpcResult<Option<FragmentDistribution>> {
1275        unimplemented!()
1276    }
1277
1278    async fn get_fragment_vnodes(
1279        &self,
1280        _fragment_id: FragmentId,
1281    ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1282        unimplemented!()
1283    }
1284
1285    async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1286        unimplemented!()
1287    }
1288
1289    fn worker_id(&self) -> WorkerId {
1290        0.into()
1291    }
1292
1293    async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1294        Ok(())
1295    }
1296
1297    async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1298        Ok(1)
1299    }
1300
1301    async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1302        Ok(())
1303    }
1304
1305    async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1306        Ok(RefreshResponse { status: None })
1307    }
1308
1309    fn cluster_id(&self) -> &str {
1310        "test-cluster-uuid"
1311    }
1312
1313    async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1314        unimplemented!()
1315    }
1316}
1317
1318#[cfg(test)]
1319pub static PROTO_FILE_DATA: &str = r#"
1320    syntax = "proto3";
1321    package test;
1322    message TestRecord {
1323      int32 id = 1;
1324      Country country = 3;
1325      int64 zipcode = 4;
1326      float rate = 5;
1327    }
1328    message TestRecordAlterType {
1329        string id = 1;
1330        Country country = 3;
1331        int32 zipcode = 4;
1332        float rate = 5;
1333      }
1334    message TestRecordExt {
1335      int32 id = 1;
1336      Country country = 3;
1337      int64 zipcode = 4;
1338      float rate = 5;
1339      string name = 6;
1340    }
1341    message Country {
1342      string address = 1;
1343      City city = 2;
1344      string zipcode = 3;
1345    }
1346    message City {
1347      string address = 1;
1348      string zipcode = 2;
1349    }"#;
1350
1351/// Returns the file.
1352/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1353pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1354    let in_file = Builder::new()
1355        .prefix("temp")
1356        .suffix(".proto")
1357        .rand_bytes(8)
1358        .tempfile()
1359        .unwrap();
1360
1361    let out_file = Builder::new()
1362        .prefix("temp")
1363        .suffix(".pb")
1364        .rand_bytes(8)
1365        .tempfile()
1366        .unwrap();
1367
1368    let mut file = in_file.as_file();
1369    file.write_all(proto_data.as_ref())
1370        .expect("writing binary to test file");
1371    file.flush().expect("flush temp file failed");
1372    let include_path = in_file
1373        .path()
1374        .parent()
1375        .unwrap()
1376        .to_string_lossy()
1377        .into_owned();
1378    let out_path = out_file.path().to_string_lossy().into_owned();
1379    let in_path = in_file.path().to_string_lossy().into_owned();
1380    let mut compile = std::process::Command::new("protoc");
1381
1382    let out = compile
1383        .arg("--include_imports")
1384        .arg("-I")
1385        .arg(include_path)
1386        .arg(format!("--descriptor_set_out={}", out_path))
1387        .arg(in_path)
1388        .output()
1389        .expect("failed to compile proto");
1390    if !out.status.success() {
1391        panic!("compile proto failed \n output: {:?}", out);
1392    }
1393    out_file
1394}