risingwave_frontend/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29    DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30    FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31    RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::util::cluster_limit::ClusterLimit;
38use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
39use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
40use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
41use risingwave_pb::backup_service::MetaSnapshotMetadata;
42use risingwave_pb::catalog::{
43    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44    PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
49use risingwave_pb::ddl_service::{
50    DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
51    alter_swap_rename_request, create_connection_request,
52};
53use risingwave_pb::hummock::write_limits::WriteLimit;
54use risingwave_pb::hummock::{
55    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
56};
57use risingwave_pb::id::ActorId;
58use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
59use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
60use risingwave_pb::meta::list_actor_states_response::ActorState;
61use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
62use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
63use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
64use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
65use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
66use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
67use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
68use risingwave_pb::meta::{
69    EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
70    RefreshRequest, RefreshResponse, SystemParams,
71};
72use risingwave_pb::secret::PbSecretRef;
73use risingwave_pb::stream_plan::StreamFragmentGraph;
74use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
75use risingwave_pb::user::update_user_request::UpdateField;
76use risingwave_pb::user::{GrantPrivilege, UserInfo};
77use risingwave_rpc_client::error::Result as RpcResult;
78use tempfile::{Builder, NamedTempFile};
79
80use crate::FrontendOpts;
81use crate::catalog::catalog_service::CatalogWriter;
82use crate::catalog::root_catalog::Catalog;
83use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
84use crate::error::{ErrorCode, Result};
85use crate::handler::RwPgResponse;
86use crate::meta_client::FrontendMetaClient;
87use crate::scheduler::HummockSnapshotManagerRef;
88use crate::session::{AuthContext, FrontendEnv, SessionImpl};
89use crate::user::UserId;
90use crate::user::user_manager::UserInfoManager;
91use crate::user::user_service::UserInfoWriter;
92
93/// An embedded frontend without starting meta and without starting frontend as a tcp server.
94pub struct LocalFrontend {
95    pub opts: FrontendOpts,
96    env: FrontendEnv,
97}
98
99impl SessionManager for LocalFrontend {
100    type Session = SessionImpl;
101
102    fn create_dummy_session(
103        &self,
104        _database_id: DatabaseId,
105        _user_name: u32,
106    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
107        unreachable!()
108    }
109
110    fn connect(
111        &self,
112        _database: &str,
113        _user_name: &str,
114        _peer_addr: AddressRef,
115    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
116        Ok(self.session_ref())
117    }
118
119    fn cancel_queries_in_session(&self, _session_id: SessionId) {
120        unreachable!()
121    }
122
123    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
124        unreachable!()
125    }
126
127    fn end_session(&self, _session: &Self::Session) {
128        unreachable!()
129    }
130}
131
132impl LocalFrontend {
133    #[expect(clippy::unused_async)]
134    pub async fn new(opts: FrontendOpts) -> Self {
135        let env = FrontendEnv::mock();
136        Self { opts, env }
137    }
138
139    pub async fn run_sql(
140        &self,
141        sql: impl Into<String>,
142    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
143        let sql: Arc<str> = Arc::from(sql.into());
144        self.session_ref().run_statement(sql, vec![]).await
145    }
146
147    pub async fn run_sql_with_session(
148        &self,
149        session_ref: Arc<SessionImpl>,
150        sql: impl Into<String>,
151    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
152        let sql: Arc<str> = Arc::from(sql.into());
153        session_ref.run_statement(sql, vec![]).await
154    }
155
156    pub async fn run_user_sql(
157        &self,
158        sql: impl Into<String>,
159        database: String,
160        user_name: String,
161        user_id: UserId,
162    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
163        let sql: Arc<str> = Arc::from(sql.into());
164        self.session_user_ref(database, user_name, user_id)
165            .run_statement(sql, vec![])
166            .await
167    }
168
169    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
170        let mut rsp = self.run_sql(sql).await.unwrap();
171        let mut res = vec![];
172        #[for_await]
173        for row_set in rsp.values_stream() {
174            for row in row_set.unwrap() {
175                res.push(format!("{:?}", row));
176            }
177        }
178        res
179    }
180
181    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
182        let mut rsp = self.run_sql(sql).await.unwrap();
183        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
184        let mut res = String::new();
185        #[for_await]
186        for row_set in rsp.values_stream() {
187            for row in row_set.unwrap() {
188                let row: Row = row;
189                let row = row.values()[0].as_ref().unwrap();
190                res += std::str::from_utf8(row).unwrap();
191                res += "\n";
192            }
193        }
194        res
195    }
196
197    /// Creates a new session
198    pub fn session_ref(&self) -> Arc<SessionImpl> {
199        self.session_user_ref(
200            DEFAULT_DATABASE_NAME.to_owned(),
201            DEFAULT_SUPER_USER.to_owned(),
202            DEFAULT_SUPER_USER_ID,
203        )
204    }
205
206    pub fn session_user_ref(
207        &self,
208        database: String,
209        user_name: String,
210        user_id: UserId,
211    ) -> Arc<SessionImpl> {
212        Arc::new(SessionImpl::new(
213            self.env.clone(),
214            AuthContext::new(database, user_name, user_id),
215            UserAuthenticator::None,
216            // Local Frontend use a non-sense id.
217            (0, 0),
218            Address::Tcp(SocketAddr::new(
219                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
220                6666,
221            ))
222            .into(),
223            Default::default(),
224        ))
225    }
226}
227
228pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
229    if rsp.stmt_type() != StatementType::EXPLAIN {
230        panic!("RESPONSE INVALID: {rsp:?}");
231    }
232    let mut res = String::new();
233    #[for_await]
234    for row_set in rsp.values_stream() {
235        for row in row_set.unwrap() {
236            let row: Row = row;
237            let row = row.values()[0].as_ref().unwrap();
238            res += std::str::from_utf8(row).unwrap();
239            res += "\n";
240        }
241    }
242    res
243}
244
245pub struct MockCatalogWriter {
246    catalog: Arc<RwLock<Catalog>>,
247    id: AtomicU32,
248    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
249    schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
250    hummock_snapshot_manager: HummockSnapshotManagerRef,
251}
252
253#[async_trait::async_trait]
254impl CatalogWriter for MockCatalogWriter {
255    async fn create_database(
256        &self,
257        db_name: &str,
258        owner: UserId,
259        resource_group: &str,
260        barrier_interval_ms: Option<u32>,
261        checkpoint_frequency: Option<u64>,
262    ) -> Result<()> {
263        let database_id = DatabaseId::new(self.gen_id());
264        self.catalog.write().create_database(&PbDatabase {
265            name: db_name.to_owned(),
266            id: database_id,
267            owner,
268            resource_group: resource_group.to_owned(),
269            barrier_interval_ms,
270            checkpoint_frequency,
271        });
272        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
273            .await?;
274        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
275            .await?;
276        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
277            .await?;
278        Ok(())
279    }
280
281    async fn create_schema(
282        &self,
283        db_id: DatabaseId,
284        schema_name: &str,
285        owner: UserId,
286    ) -> Result<()> {
287        let id = self.gen_id();
288        self.catalog.write().create_schema(&PbSchema {
289            id,
290            name: schema_name.to_owned(),
291            database_id: db_id,
292            owner,
293        });
294        self.add_schema_id(id, db_id);
295        Ok(())
296    }
297
298    async fn create_materialized_view(
299        &self,
300        mut table: PbTable,
301        _graph: StreamFragmentGraph,
302        _dependencies: HashSet<ObjectId>,
303        _specific_resource_group: Option<String>,
304        _if_not_exists: bool,
305    ) -> Result<()> {
306        table.id = self.gen_id();
307        table.stream_job_status = PbStreamJobStatus::Created as _;
308        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
309        self.catalog.write().create_table(&table);
310        self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
311        self.hummock_snapshot_manager.add_table_for_test(table.id);
312        Ok(())
313    }
314
315    async fn replace_materialized_view(
316        &self,
317        mut table: PbTable,
318        _graph: StreamFragmentGraph,
319    ) -> Result<()> {
320        table.stream_job_status = PbStreamJobStatus::Created as _;
321        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
322        self.catalog.write().update_table(&table);
323        Ok(())
324    }
325
326    async fn create_view(&self, mut view: PbView, _dependencies: HashSet<ObjectId>) -> Result<()> {
327        view.id = self.gen_id();
328        self.catalog.write().create_view(&view);
329        self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
330        Ok(())
331    }
332
333    async fn create_table(
334        &self,
335        source: Option<PbSource>,
336        mut table: PbTable,
337        graph: StreamFragmentGraph,
338        _job_type: PbTableJobType,
339        if_not_exists: bool,
340        _dependencies: HashSet<ObjectId>,
341    ) -> Result<()> {
342        if let Some(source) = source {
343            let source_id = self.create_source_inner(source)?;
344            table.optional_associated_source_id = Some(source_id.into());
345        }
346        self.create_materialized_view(table, graph, HashSet::new(), None, if_not_exists)
347            .await?;
348        Ok(())
349    }
350
351    async fn replace_table(
352        &self,
353        _source: Option<PbSource>,
354        mut table: PbTable,
355        _graph: StreamFragmentGraph,
356        _job_type: TableJobType,
357    ) -> Result<()> {
358        table.stream_job_status = PbStreamJobStatus::Created as _;
359        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
360        self.catalog.write().update_table(&table);
361        Ok(())
362    }
363
364    async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
365        self.catalog.write().update_source(&source);
366        Ok(())
367    }
368
369    async fn create_source(
370        &self,
371        source: PbSource,
372        _graph: Option<StreamFragmentGraph>,
373        _if_not_exists: bool,
374    ) -> Result<()> {
375        self.create_source_inner(source).map(|_| ())
376    }
377
378    async fn create_sink(
379        &self,
380        sink: PbSink,
381        graph: StreamFragmentGraph,
382        _dependencies: HashSet<ObjectId>,
383        _if_not_exists: bool,
384    ) -> Result<()> {
385        self.create_sink_inner(sink, graph)
386    }
387
388    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
389        self.create_subscription_inner(subscription)
390    }
391
392    async fn create_index(
393        &self,
394        mut index: PbIndex,
395        mut index_table: PbTable,
396        _graph: StreamFragmentGraph,
397        _if_not_exists: bool,
398    ) -> Result<()> {
399        index_table.id = self.gen_id();
400        index_table.stream_job_status = PbStreamJobStatus::Created as _;
401        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
402        self.catalog.write().create_table(&index_table);
403        self.add_table_or_index_id(
404            index_table.id.as_raw_id(),
405            index_table.schema_id,
406            index_table.database_id,
407        );
408
409        index.id = index_table.id.as_raw_id().into();
410        index.index_table_id = index_table.id;
411        self.catalog.write().create_index(&index);
412        Ok(())
413    }
414
415    async fn create_function(&self, _function: PbFunction) -> Result<()> {
416        unreachable!()
417    }
418
419    async fn create_connection(
420        &self,
421        _connection_name: String,
422        _database_id: DatabaseId,
423        _schema_id: SchemaId,
424        _owner_id: u32,
425        _connection: create_connection_request::Payload,
426    ) -> Result<()> {
427        unreachable!()
428    }
429
430    async fn create_secret(
431        &self,
432        _secret_name: String,
433        _database_id: DatabaseId,
434        _schema_id: SchemaId,
435        _owner_id: u32,
436        _payload: Vec<u8>,
437    ) -> Result<()> {
438        unreachable!()
439    }
440
441    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
442        unreachable!()
443    }
444
445    async fn drop_table(
446        &self,
447        source_id: Option<u32>,
448        table_id: TableId,
449        cascade: bool,
450    ) -> Result<()> {
451        if cascade {
452            return Err(ErrorCode::NotSupported(
453                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
454                "use drop instead".to_owned(),
455            )
456            .into());
457        }
458        if let Some(source_id) = source_id {
459            self.drop_table_or_source_id(source_id);
460        }
461        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
462        let indexes =
463            self.catalog
464                .read()
465                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
466        for index in indexes {
467            self.drop_index(index.id, cascade).await?;
468        }
469        self.catalog
470            .write()
471            .drop_table(database_id, schema_id, table_id);
472        if let Some(source_id) = source_id {
473            self.catalog
474                .write()
475                .drop_source(database_id, schema_id, source_id.into());
476        }
477        Ok(())
478    }
479
480    async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
481        unreachable!()
482    }
483
484    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
485        if cascade {
486            return Err(ErrorCode::NotSupported(
487                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
488                "use drop instead".to_owned(),
489            )
490            .into());
491        }
492        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
493        let indexes =
494            self.catalog
495                .read()
496                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
497        for index in indexes {
498            self.drop_index(index.id, cascade).await?;
499        }
500        self.catalog
501            .write()
502            .drop_table(database_id, schema_id, table_id);
503        Ok(())
504    }
505
506    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
507        if cascade {
508            return Err(ErrorCode::NotSupported(
509                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
510                "use drop instead".to_owned(),
511            )
512            .into());
513        }
514        let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
515        self.catalog
516            .write()
517            .drop_source(database_id, schema_id, source_id);
518        Ok(())
519    }
520
521    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
522        if cascade {
523            return Err(ErrorCode::NotSupported(
524                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
525                "use drop instead".to_owned(),
526            )
527            .into());
528        }
529        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
530        self.catalog
531            .write()
532            .drop_sink(database_id, schema_id, sink_id);
533        Ok(())
534    }
535
536    async fn drop_subscription(
537        &self,
538        subscription_id: SubscriptionId,
539        cascade: bool,
540    ) -> Result<()> {
541        if cascade {
542            return Err(ErrorCode::NotSupported(
543                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
544                "use drop instead".to_owned(),
545            )
546            .into());
547        }
548        let (database_id, schema_id) =
549            self.drop_table_or_subscription_id(subscription_id.as_raw_id());
550        self.catalog
551            .write()
552            .drop_subscription(database_id, schema_id, subscription_id);
553        Ok(())
554    }
555
556    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
557        if cascade {
558            return Err(ErrorCode::NotSupported(
559                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
560                "use drop instead".to_owned(),
561            )
562            .into());
563        }
564        let &schema_id = self
565            .table_id_to_schema_id
566            .read()
567            .get(&index_id.as_raw_id())
568            .unwrap();
569        let database_id = self.get_database_id_by_schema(schema_id);
570
571        let index = {
572            let catalog_reader = self.catalog.read();
573            let schema_catalog = catalog_reader
574                .get_schema_by_id(database_id, schema_id)
575                .unwrap();
576            schema_catalog.get_index_by_id(index_id).unwrap().clone()
577        };
578
579        let index_table_id = index.index_table().id;
580        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
581        self.catalog
582            .write()
583            .drop_index(database_id, schema_id, index_id);
584        self.catalog
585            .write()
586            .drop_table(database_id, schema_id, index_table_id);
587        Ok(())
588    }
589
590    async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
591        unreachable!()
592    }
593
594    async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
595        unreachable!()
596    }
597
598    async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
599        unreachable!()
600    }
601
602    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
603        self.catalog.write().drop_database(database_id);
604        Ok(())
605    }
606
607    async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
608        let database_id = self.drop_schema_id(schema_id);
609        self.catalog.write().drop_schema(database_id, schema_id);
610        Ok(())
611    }
612
613    async fn alter_name(
614        &self,
615        object_id: alter_name_request::Object,
616        object_name: &str,
617    ) -> Result<()> {
618        match object_id {
619            alter_name_request::Object::TableId(table_id) => {
620                self.catalog
621                    .write()
622                    .alter_table_name_by_id(table_id.into(), object_name);
623                Ok(())
624            }
625            _ => {
626                unimplemented!()
627            }
628        }
629    }
630
631    async fn alter_source(&self, source: PbSource) -> Result<()> {
632        self.catalog.write().update_source(&source);
633        Ok(())
634    }
635
636    async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
637        for database in self.catalog.read().iter_databases() {
638            for schema in database.iter_schemas() {
639                match object {
640                    Object::TableId(table_id) => {
641                        if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
642                        {
643                            let mut pb_table = table.to_prost();
644                            pb_table.owner = owner_id;
645                            self.catalog.write().update_table(&pb_table);
646                            return Ok(());
647                        }
648                    }
649                    _ => unreachable!(),
650                }
651            }
652        }
653
654        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
655    }
656
657    async fn alter_set_schema(
658        &self,
659        object: alter_set_schema_request::Object,
660        new_schema_id: SchemaId,
661    ) -> Result<()> {
662        match object {
663            alter_set_schema_request::Object::TableId(table_id) => {
664                let mut pb_table = {
665                    let reader = self.catalog.read();
666                    let table = reader.get_any_table_by_id(table_id.into())?.to_owned();
667                    table.to_prost()
668                };
669                pb_table.schema_id = new_schema_id;
670                self.catalog.write().update_table(&pb_table);
671                self.table_id_to_schema_id
672                    .write()
673                    .insert(table_id, new_schema_id);
674                Ok(())
675            }
676            _ => unreachable!(),
677        }
678    }
679
680    async fn alter_parallelism(
681        &self,
682        _job_id: JobId,
683        _parallelism: PbTableParallelism,
684        _deferred: bool,
685    ) -> Result<()> {
686        todo!()
687    }
688
689    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
690        todo!()
691    }
692
693    async fn alter_secret(
694        &self,
695        _secret_id: SecretId,
696        _secret_name: String,
697        _database_id: DatabaseId,
698        _schema_id: SchemaId,
699        _owner_id: u32,
700        _payload: Vec<u8>,
701    ) -> Result<()> {
702        unreachable!()
703    }
704
705    async fn alter_resource_group(
706        &self,
707        _table_id: TableId,
708        _resource_group: Option<String>,
709        _deferred: bool,
710    ) -> Result<()> {
711        todo!()
712    }
713
714    async fn alter_database_param(
715        &self,
716        database_id: DatabaseId,
717        param: AlterDatabaseParam,
718    ) -> Result<()> {
719        let mut pb_database = {
720            let reader = self.catalog.read();
721            let database = reader.get_database_by_id(database_id)?.to_owned();
722            database.to_prost()
723        };
724        match param {
725            AlterDatabaseParam::BarrierIntervalMs(interval) => {
726                pb_database.barrier_interval_ms = interval;
727            }
728            AlterDatabaseParam::CheckpointFrequency(frequency) => {
729                pb_database.checkpoint_frequency = frequency;
730            }
731        }
732        self.catalog.write().update_database(&pb_database);
733        Ok(())
734    }
735
736    async fn create_iceberg_table(
737        &self,
738        _table_job_info: PbTableJobInfo,
739        _sink_job_info: PbSinkJobInfo,
740        _iceberg_source: PbSource,
741        _if_not_exists: bool,
742    ) -> Result<()> {
743        todo!()
744    }
745}
746
747impl MockCatalogWriter {
748    pub fn new(
749        catalog: Arc<RwLock<Catalog>>,
750        hummock_snapshot_manager: HummockSnapshotManagerRef,
751    ) -> Self {
752        catalog.write().create_database(&PbDatabase {
753            id: 0.into(),
754            name: DEFAULT_DATABASE_NAME.to_owned(),
755            owner: DEFAULT_SUPER_USER_ID,
756            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
757            barrier_interval_ms: None,
758            checkpoint_frequency: None,
759        });
760        catalog.write().create_schema(&PbSchema {
761            id: 1.into(),
762            name: DEFAULT_SCHEMA_NAME.to_owned(),
763            database_id: 0.into(),
764            owner: DEFAULT_SUPER_USER_ID,
765        });
766        catalog.write().create_schema(&PbSchema {
767            id: 2.into(),
768            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
769            database_id: 0.into(),
770            owner: DEFAULT_SUPER_USER_ID,
771        });
772        catalog.write().create_schema(&PbSchema {
773            id: 3.into(),
774            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
775            database_id: 0.into(),
776            owner: DEFAULT_SUPER_USER_ID,
777        });
778        let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
779        map.insert(1_u32.into(), 0_u32.into());
780        map.insert(2_u32.into(), 0_u32.into());
781        map.insert(3_u32.into(), 0_u32.into());
782        Self {
783            catalog,
784            id: AtomicU32::new(3),
785            table_id_to_schema_id: Default::default(),
786            schema_id_to_database_id: RwLock::new(map),
787            hummock_snapshot_manager,
788        }
789    }
790
791    fn gen_id<T: From<u32>>(&self) -> T {
792        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
793        (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
794    }
795
796    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
797        self.table_id_to_schema_id
798            .write()
799            .insert(table_id, schema_id);
800    }
801
802    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
803        let schema_id = self
804            .table_id_to_schema_id
805            .write()
806            .remove(&table_id)
807            .unwrap();
808        (self.get_database_id_by_schema(schema_id), schema_id)
809    }
810
811    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
812        self.table_id_to_schema_id
813            .write()
814            .insert(table_id, schema_id);
815    }
816
817    fn add_table_or_subscription_id(
818        &self,
819        table_id: u32,
820        schema_id: SchemaId,
821        _database_id: DatabaseId,
822    ) {
823        self.table_id_to_schema_id
824            .write()
825            .insert(table_id, schema_id);
826    }
827
828    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
829        self.table_id_to_schema_id
830            .write()
831            .insert(table_id, schema_id);
832    }
833
834    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
835        let schema_id = self
836            .table_id_to_schema_id
837            .write()
838            .remove(&table_id)
839            .unwrap();
840        (self.get_database_id_by_schema(schema_id), schema_id)
841    }
842
843    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
844        let schema_id = self
845            .table_id_to_schema_id
846            .write()
847            .remove(&table_id)
848            .unwrap();
849        (self.get_database_id_by_schema(schema_id), schema_id)
850    }
851
852    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
853        let schema_id = self
854            .table_id_to_schema_id
855            .write()
856            .remove(&table_id)
857            .unwrap();
858        (self.get_database_id_by_schema(schema_id), schema_id)
859    }
860
861    fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
862        self.schema_id_to_database_id
863            .write()
864            .insert(schema_id, database_id);
865    }
866
867    fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
868        self.schema_id_to_database_id
869            .write()
870            .remove(&schema_id)
871            .unwrap()
872    }
873
874    fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
875        source.id = self.gen_id();
876        self.catalog.write().create_source(&source);
877        self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
878        Ok(source.id)
879    }
880
881    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
882        sink.id = self.gen_id();
883        sink.stream_job_status = PbStreamJobStatus::Created as _;
884        self.catalog.write().create_sink(&sink);
885        self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
886        Ok(())
887    }
888
889    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
890        subscription.id = self.gen_id();
891        self.catalog.write().create_subscription(&subscription);
892        self.add_table_or_subscription_id(
893            subscription.id.as_raw_id(),
894            subscription.schema_id,
895            subscription.database_id,
896        );
897        Ok(())
898    }
899
900    fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
901        *self
902            .schema_id_to_database_id
903            .read()
904            .get(&schema_id)
905            .unwrap()
906    }
907}
908
909pub struct MockUserInfoWriter {
910    id: AtomicU32,
911    user_info: Arc<RwLock<UserInfoManager>>,
912}
913
914#[async_trait::async_trait]
915impl UserInfoWriter for MockUserInfoWriter {
916    async fn create_user(&self, user: UserInfo) -> Result<()> {
917        let mut user = user;
918        user.id = self.gen_id();
919        self.user_info.write().create_user(user);
920        Ok(())
921    }
922
923    async fn drop_user(&self, id: UserId) -> Result<()> {
924        self.user_info.write().drop_user(id);
925        Ok(())
926    }
927
928    async fn update_user(
929        &self,
930        update_user: UserInfo,
931        update_fields: Vec<UpdateField>,
932    ) -> Result<()> {
933        let mut lock = self.user_info.write();
934        let id = update_user.get_id();
935        let Some(old_name) = lock.get_user_name_by_id(id) else {
936            return Ok(());
937        };
938        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
939        update_fields.into_iter().for_each(|field| match field {
940            UpdateField::Super => user_info.is_super = update_user.is_super,
941            UpdateField::Login => user_info.can_login = update_user.can_login,
942            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
943            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
944            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
945            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
946            UpdateField::Admin => user_info.is_admin = update_user.is_admin,
947            UpdateField::Unspecified => unreachable!(),
948        });
949        lock.update_user(update_user);
950        Ok(())
951    }
952
953    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
954    /// `GrantAllSources` when grant privilege to user.
955    async fn grant_privilege(
956        &self,
957        users: Vec<UserId>,
958        privileges: Vec<GrantPrivilege>,
959        with_grant_option: bool,
960        _grantor: UserId,
961    ) -> Result<()> {
962        let privileges = privileges
963            .into_iter()
964            .map(|mut p| {
965                p.action_with_opts
966                    .iter_mut()
967                    .for_each(|ao| ao.with_grant_option = with_grant_option);
968                p
969            })
970            .collect::<Vec<_>>();
971        for user_id in users {
972            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
973                u.extend_privileges(privileges.clone());
974            }
975        }
976        Ok(())
977    }
978
979    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
980    /// `RevokeAllSources` when revoke privilege from user.
981    async fn revoke_privilege(
982        &self,
983        users: Vec<UserId>,
984        privileges: Vec<GrantPrivilege>,
985        _granted_by: UserId,
986        _revoke_by: UserId,
987        revoke_grant_option: bool,
988        _cascade: bool,
989    ) -> Result<()> {
990        for user_id in users {
991            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
992                u.revoke_privileges(privileges.clone(), revoke_grant_option);
993            }
994        }
995        Ok(())
996    }
997
998    async fn alter_default_privilege(
999        &self,
1000        _users: Vec<UserId>,
1001        _database_id: DatabaseId,
1002        _schemas: Vec<SchemaId>,
1003        _operation: AlterDefaultPrivilegeOperation,
1004        _operated_by: UserId,
1005    ) -> Result<()> {
1006        todo!()
1007    }
1008}
1009
1010impl MockUserInfoWriter {
1011    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1012        user_info.write().create_user(UserInfo {
1013            id: DEFAULT_SUPER_USER_ID,
1014            name: DEFAULT_SUPER_USER.to_owned(),
1015            is_super: true,
1016            can_create_db: true,
1017            can_create_user: true,
1018            can_login: true,
1019            ..Default::default()
1020        });
1021        user_info.write().create_user(UserInfo {
1022            id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1023            name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1024            is_super: true,
1025            can_create_db: true,
1026            can_create_user: true,
1027            can_login: true,
1028            is_admin: true,
1029            ..Default::default()
1030        });
1031        Self {
1032            user_info,
1033            id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
1034        }
1035    }
1036
1037    fn gen_id(&self) -> u32 {
1038        self.id.fetch_add(1, Ordering::SeqCst)
1039    }
1040}
1041
1042pub struct MockFrontendMetaClient {}
1043
1044#[async_trait::async_trait]
1045impl FrontendMetaClient for MockFrontendMetaClient {
1046    async fn try_unregister(&self) {}
1047
1048    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1049        Ok(INVALID_VERSION_ID)
1050    }
1051
1052    async fn wait(&self) -> RpcResult<()> {
1053        Ok(())
1054    }
1055
1056    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1057        Ok(vec![])
1058    }
1059
1060    async fn list_table_fragments(
1061        &self,
1062        _table_ids: &[JobId],
1063    ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1064        Ok(HashMap::default())
1065    }
1066
1067    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1068        Ok(vec![])
1069    }
1070
1071    async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1072        Ok(vec![])
1073    }
1074
1075    async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1076        Ok(vec![])
1077    }
1078
1079    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1080        Ok(vec![])
1081    }
1082
1083    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1084        Ok(vec![])
1085    }
1086
1087    async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1088        Ok(vec![])
1089    }
1090
1091    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1092        Ok(vec![])
1093    }
1094
1095    async fn set_system_param(
1096        &self,
1097        _param: String,
1098        _value: Option<String>,
1099    ) -> RpcResult<Option<SystemParamsReader>> {
1100        Ok(Some(SystemParams::default().into()))
1101    }
1102
1103    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1104        Ok(Default::default())
1105    }
1106
1107    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1108        Ok("".to_owned())
1109    }
1110
1111    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1112        Ok(vec![])
1113    }
1114
1115    async fn get_tables(
1116        &self,
1117        _table_ids: Vec<crate::catalog::TableId>,
1118        _include_dropped_tables: bool,
1119    ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1120        Ok(HashMap::new())
1121    }
1122
1123    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, u64)>> {
1124        unimplemented!()
1125    }
1126
1127    async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1128        unimplemented!()
1129    }
1130
1131    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1132        Ok(HummockVersion::default())
1133    }
1134
1135    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1136        unimplemented!()
1137    }
1138
1139    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1140        unimplemented!()
1141    }
1142
1143    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1144        unimplemented!()
1145    }
1146
1147    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1148        unimplemented!()
1149    }
1150
1151    async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1152        unimplemented!()
1153    }
1154
1155    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1156        unimplemented!()
1157    }
1158
1159    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1160        Ok(vec![])
1161    }
1162
1163    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1164        unimplemented!()
1165    }
1166
1167    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1168        Ok(vec![])
1169    }
1170
1171    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1172        unimplemented!()
1173    }
1174
1175    async fn recover(&self) -> RpcResult<()> {
1176        unimplemented!()
1177    }
1178
1179    async fn apply_throttle(
1180        &self,
1181        _kind: PbThrottleTarget,
1182        _id: u32,
1183        _rate_limit: Option<u32>,
1184    ) -> RpcResult<()> {
1185        unimplemented!()
1186    }
1187
1188    async fn alter_fragment_parallelism(
1189        &self,
1190        _fragment_ids: Vec<FragmentId>,
1191        _parallelism: Option<PbTableParallelism>,
1192    ) -> RpcResult<()> {
1193        unimplemented!()
1194    }
1195
1196    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1197        Ok(RecoveryStatus::StatusRunning)
1198    }
1199
1200    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1201        Ok(vec![])
1202    }
1203
1204    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1205        Ok(vec![])
1206    }
1207
1208    async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1209        Ok(HashMap::default())
1210    }
1211
1212    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1213        unimplemented!()
1214    }
1215
1216    async fn alter_sink_props(
1217        &self,
1218        _sink_id: SinkId,
1219        _changed_props: BTreeMap<String, String>,
1220        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1221        _connector_conn_ref: Option<ConnectionId>,
1222    ) -> RpcResult<()> {
1223        unimplemented!()
1224    }
1225
1226    async fn alter_iceberg_table_props(
1227        &self,
1228        _table_id: TableId,
1229        _sink_id: SinkId,
1230        _source_id: SourceId,
1231        _changed_props: BTreeMap<String, String>,
1232        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1233        _connector_conn_ref: Option<ConnectionId>,
1234    ) -> RpcResult<()> {
1235        unimplemented!()
1236    }
1237
1238    async fn alter_source_connector_props(
1239        &self,
1240        _source_id: SourceId,
1241        _changed_props: BTreeMap<String, String>,
1242        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1243        _connector_conn_ref: Option<ConnectionId>,
1244    ) -> RpcResult<()> {
1245        unimplemented!()
1246    }
1247
1248    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1249        unimplemented!()
1250    }
1251
1252    async fn get_fragment_by_id(
1253        &self,
1254        _fragment_id: FragmentId,
1255    ) -> RpcResult<Option<FragmentDistribution>> {
1256        unimplemented!()
1257    }
1258
1259    async fn get_fragment_vnodes(
1260        &self,
1261        _fragment_id: FragmentId,
1262    ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1263        unimplemented!()
1264    }
1265
1266    async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1267        unimplemented!()
1268    }
1269
1270    fn worker_id(&self) -> WorkerId {
1271        0.into()
1272    }
1273
1274    async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1275        Ok(())
1276    }
1277
1278    async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1279        Ok(1)
1280    }
1281
1282    async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1283        Ok(())
1284    }
1285
1286    async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1287        Ok(RefreshResponse { status: None })
1288    }
1289
1290    fn cluster_id(&self) -> &str {
1291        "test-cluster-uuid"
1292    }
1293
1294    async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1295        unimplemented!()
1296    }
1297}
1298
1299#[cfg(test)]
1300pub static PROTO_FILE_DATA: &str = r#"
1301    syntax = "proto3";
1302    package test;
1303    message TestRecord {
1304      int32 id = 1;
1305      Country country = 3;
1306      int64 zipcode = 4;
1307      float rate = 5;
1308    }
1309    message TestRecordAlterType {
1310        string id = 1;
1311        Country country = 3;
1312        int32 zipcode = 4;
1313        float rate = 5;
1314      }
1315    message TestRecordExt {
1316      int32 id = 1;
1317      Country country = 3;
1318      int64 zipcode = 4;
1319      float rate = 5;
1320      string name = 6;
1321    }
1322    message Country {
1323      string address = 1;
1324      City city = 2;
1325      string zipcode = 3;
1326    }
1327    message City {
1328      string address = 1;
1329      string zipcode = 2;
1330    }"#;
1331
1332/// Returns the file.
1333/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1334pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1335    let in_file = Builder::new()
1336        .prefix("temp")
1337        .suffix(".proto")
1338        .rand_bytes(8)
1339        .tempfile()
1340        .unwrap();
1341
1342    let out_file = Builder::new()
1343        .prefix("temp")
1344        .suffix(".pb")
1345        .rand_bytes(8)
1346        .tempfile()
1347        .unwrap();
1348
1349    let mut file = in_file.as_file();
1350    file.write_all(proto_data.as_ref())
1351        .expect("writing binary to test file");
1352    file.flush().expect("flush temp file failed");
1353    let include_path = in_file
1354        .path()
1355        .parent()
1356        .unwrap()
1357        .to_string_lossy()
1358        .into_owned();
1359    let out_path = out_file.path().to_string_lossy().into_owned();
1360    let in_path = in_file.path().to_string_lossy().into_owned();
1361    let mut compile = std::process::Command::new("protoc");
1362
1363    let out = compile
1364        .arg("--include_imports")
1365        .arg("-I")
1366        .arg(include_path)
1367        .arg(format!("--descriptor_set_out={}", out_path))
1368        .arg(in_path)
1369        .output()
1370        .expect("failed to compile proto");
1371    if !out.status.success() {
1372        panic!("compile proto failed \n output: {:?}", out);
1373    }
1374    out_file
1375}