1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29    DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30    FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31    RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::session_config::SessionConfig;
35use risingwave_common::system_param::reader::SystemParamsReader;
36use risingwave_common::util::cluster_limit::ClusterLimit;
37use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
38use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
39use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
40use risingwave_pb::backup_service::MetaSnapshotMetadata;
41use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
42use risingwave_pb::catalog::{
43    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44    PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
49use risingwave_pb::ddl_service::{
50    DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
51    alter_swap_rename_request, create_connection_request,
52};
53use risingwave_pb::hummock::write_limits::WriteLimit;
54use risingwave_pb::hummock::{
55    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
56};
57use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
58use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
59use risingwave_pb::meta::list_actor_states_response::ActorState;
60use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
61use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
62use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
63use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
64use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
65use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
66use risingwave_pb::meta::{
67    EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
68    RefreshRequest, RefreshResponse, SystemParams,
69};
70use risingwave_pb::secret::PbSecretRef;
71use risingwave_pb::stream_plan::StreamFragmentGraph;
72use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
73use risingwave_pb::user::update_user_request::UpdateField;
74use risingwave_pb::user::{GrantPrivilege, UserInfo};
75use risingwave_rpc_client::error::Result as RpcResult;
76use tempfile::{Builder, NamedTempFile};
77
78use crate::FrontendOpts;
79use crate::catalog::catalog_service::CatalogWriter;
80use crate::catalog::root_catalog::Catalog;
81use crate::catalog::{DatabaseId, SchemaId, SecretId, SinkId};
82use crate::error::{ErrorCode, Result};
83use crate::handler::RwPgResponse;
84use crate::meta_client::FrontendMetaClient;
85use crate::scheduler::HummockSnapshotManagerRef;
86use crate::session::{AuthContext, FrontendEnv, SessionImpl};
87use crate::user::UserId;
88use crate::user::user_manager::UserInfoManager;
89use crate::user::user_service::UserInfoWriter;
90
91pub struct LocalFrontend {
93    pub opts: FrontendOpts,
94    env: FrontendEnv,
95}
96
97impl SessionManager for LocalFrontend {
98    type Session = SessionImpl;
99
100    fn create_dummy_session(
101        &self,
102        _database_id: u32,
103        _user_name: u32,
104    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
105        unreachable!()
106    }
107
108    fn connect(
109        &self,
110        _database: &str,
111        _user_name: &str,
112        _peer_addr: AddressRef,
113    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
114        Ok(self.session_ref())
115    }
116
117    fn cancel_queries_in_session(&self, _session_id: SessionId) {
118        unreachable!()
119    }
120
121    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
122        unreachable!()
123    }
124
125    fn end_session(&self, _session: &Self::Session) {
126        unreachable!()
127    }
128}
129
130impl LocalFrontend {
131    #[expect(clippy::unused_async)]
132    pub async fn new(opts: FrontendOpts) -> Self {
133        let env = FrontendEnv::mock();
134        Self { opts, env }
135    }
136
137    pub async fn run_sql(
138        &self,
139        sql: impl Into<String>,
140    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
141        let sql: Arc<str> = Arc::from(sql.into());
142        self.session_ref().run_statement(sql, vec![]).await
143    }
144
145    pub async fn run_sql_with_session(
146        &self,
147        session_ref: Arc<SessionImpl>,
148        sql: impl Into<String>,
149    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
150        let sql: Arc<str> = Arc::from(sql.into());
151        session_ref.run_statement(sql, vec![]).await
152    }
153
154    pub async fn run_user_sql(
155        &self,
156        sql: impl Into<String>,
157        database: String,
158        user_name: String,
159        user_id: UserId,
160    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
161        let sql: Arc<str> = Arc::from(sql.into());
162        self.session_user_ref(database, user_name, user_id)
163            .run_statement(sql, vec![])
164            .await
165    }
166
167    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
168        let mut rsp = self.run_sql(sql).await.unwrap();
169        let mut res = vec![];
170        #[for_await]
171        for row_set in rsp.values_stream() {
172            for row in row_set.unwrap() {
173                res.push(format!("{:?}", row));
174            }
175        }
176        res
177    }
178
179    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
180        let mut rsp = self.run_sql(sql).await.unwrap();
181        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
182        let mut res = String::new();
183        #[for_await]
184        for row_set in rsp.values_stream() {
185            for row in row_set.unwrap() {
186                let row: Row = row;
187                let row = row.values()[0].as_ref().unwrap();
188                res += std::str::from_utf8(row).unwrap();
189                res += "\n";
190            }
191        }
192        res
193    }
194
195    pub fn session_ref(&self) -> Arc<SessionImpl> {
197        self.session_user_ref(
198            DEFAULT_DATABASE_NAME.to_owned(),
199            DEFAULT_SUPER_USER.to_owned(),
200            DEFAULT_SUPER_USER_ID,
201        )
202    }
203
204    pub fn session_user_ref(
205        &self,
206        database: String,
207        user_name: String,
208        user_id: UserId,
209    ) -> Arc<SessionImpl> {
210        Arc::new(SessionImpl::new(
211            self.env.clone(),
212            AuthContext::new(database, user_name, user_id),
213            UserAuthenticator::None,
214            (0, 0),
216            Address::Tcp(SocketAddr::new(
217                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
218                6666,
219            ))
220            .into(),
221            Default::default(),
222        ))
223    }
224}
225
226pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
227    if rsp.stmt_type() != StatementType::EXPLAIN {
228        panic!("RESPONSE INVALID: {rsp:?}");
229    }
230    let mut res = String::new();
231    #[for_await]
232    for row_set in rsp.values_stream() {
233        for row in row_set.unwrap() {
234            let row: Row = row;
235            let row = row.values()[0].as_ref().unwrap();
236            res += std::str::from_utf8(row).unwrap();
237            res += "\n";
238        }
239    }
240    res
241}
242
243pub struct MockCatalogWriter {
244    catalog: Arc<RwLock<Catalog>>,
245    id: AtomicU32,
246    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
247    schema_id_to_database_id: RwLock<HashMap<u32, DatabaseId>>,
248    hummock_snapshot_manager: HummockSnapshotManagerRef,
249}
250
251#[async_trait::async_trait]
252impl CatalogWriter for MockCatalogWriter {
253    async fn create_database(
254        &self,
255        db_name: &str,
256        owner: UserId,
257        resource_group: &str,
258        barrier_interval_ms: Option<u32>,
259        checkpoint_frequency: Option<u64>,
260    ) -> Result<()> {
261        let database_id = self.gen_id();
262        self.catalog.write().create_database(&PbDatabase {
263            name: db_name.to_owned(),
264            id: database_id,
265            owner,
266            resource_group: resource_group.to_owned(),
267            barrier_interval_ms,
268            checkpoint_frequency,
269        });
270        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
271            .await?;
272        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
273            .await?;
274        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
275            .await?;
276        Ok(())
277    }
278
279    async fn create_schema(
280        &self,
281        db_id: DatabaseId,
282        schema_name: &str,
283        owner: UserId,
284    ) -> Result<()> {
285        let id = self.gen_id();
286        self.catalog.write().create_schema(&PbSchema {
287            id,
288            name: schema_name.to_owned(),
289            database_id: db_id,
290            owner,
291        });
292        self.add_schema_id(id, db_id);
293        Ok(())
294    }
295
296    async fn create_materialized_view(
297        &self,
298        mut table: PbTable,
299        _graph: StreamFragmentGraph,
300        _dependencies: HashSet<ObjectId>,
301        _specific_resource_group: Option<String>,
302        _if_not_exists: bool,
303    ) -> Result<()> {
304        table.id = self.gen_id();
305        table.stream_job_status = PbStreamJobStatus::Created as _;
306        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
307        self.catalog.write().create_table(&table);
308        self.add_table_or_source_id(table.id, table.schema_id, table.database_id);
309        self.hummock_snapshot_manager
310            .add_table_for_test(TableId::new(table.id));
311        Ok(())
312    }
313
314    async fn replace_materialized_view(
315        &self,
316        mut table: PbTable,
317        _graph: StreamFragmentGraph,
318    ) -> Result<()> {
319        table.stream_job_status = PbStreamJobStatus::Created as _;
320        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
321        self.catalog.write().update_table(&table);
322        Ok(())
323    }
324
325    async fn create_view(&self, mut view: PbView, _dependencies: HashSet<ObjectId>) -> Result<()> {
326        view.id = self.gen_id();
327        self.catalog.write().create_view(&view);
328        self.add_table_or_source_id(view.id, view.schema_id, view.database_id);
329        Ok(())
330    }
331
332    async fn create_table(
333        &self,
334        source: Option<PbSource>,
335        mut table: PbTable,
336        graph: StreamFragmentGraph,
337        _job_type: PbTableJobType,
338        if_not_exists: bool,
339        _dependencies: HashSet<ObjectId>,
340    ) -> Result<()> {
341        if let Some(source) = source {
342            let source_id = self.create_source_inner(source)?;
343            table.optional_associated_source_id =
344                Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
345        }
346        self.create_materialized_view(table, graph, HashSet::new(), None, if_not_exists)
347            .await?;
348        Ok(())
349    }
350
351    async fn replace_table(
352        &self,
353        _source: Option<PbSource>,
354        mut table: PbTable,
355        _graph: StreamFragmentGraph,
356        _job_type: TableJobType,
357    ) -> Result<()> {
358        table.stream_job_status = PbStreamJobStatus::Created as _;
359        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
360        self.catalog.write().update_table(&table);
361        Ok(())
362    }
363
364    async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
365        self.catalog.write().update_source(&source);
366        Ok(())
367    }
368
369    async fn create_source(
370        &self,
371        source: PbSource,
372        _graph: Option<StreamFragmentGraph>,
373        _if_not_exists: bool,
374    ) -> Result<()> {
375        self.create_source_inner(source).map(|_| ())
376    }
377
378    async fn create_sink(
379        &self,
380        sink: PbSink,
381        graph: StreamFragmentGraph,
382        _dependencies: HashSet<ObjectId>,
383        _if_not_exists: bool,
384    ) -> Result<()> {
385        self.create_sink_inner(sink, graph)
386    }
387
388    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
389        self.create_subscription_inner(subscription)
390    }
391
392    async fn create_index(
393        &self,
394        mut index: PbIndex,
395        mut index_table: PbTable,
396        _graph: StreamFragmentGraph,
397        _if_not_exists: bool,
398    ) -> Result<()> {
399        index_table.id = self.gen_id();
400        index_table.stream_job_status = PbStreamJobStatus::Created as _;
401        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
402        self.catalog.write().create_table(&index_table);
403        self.add_table_or_index_id(
404            index_table.id,
405            index_table.schema_id,
406            index_table.database_id,
407        );
408
409        index.id = index_table.id;
410        index.index_table_id = index_table.id;
411        self.catalog.write().create_index(&index);
412        Ok(())
413    }
414
415    async fn create_function(&self, _function: PbFunction) -> Result<()> {
416        unreachable!()
417    }
418
419    async fn create_connection(
420        &self,
421        _connection_name: String,
422        _database_id: u32,
423        _schema_id: u32,
424        _owner_id: u32,
425        _connection: create_connection_request::Payload,
426    ) -> Result<()> {
427        unreachable!()
428    }
429
430    async fn create_secret(
431        &self,
432        _secret_name: String,
433        _database_id: u32,
434        _schema_id: u32,
435        _owner_id: u32,
436        _payload: Vec<u8>,
437    ) -> Result<()> {
438        unreachable!()
439    }
440
441    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
442        unreachable!()
443    }
444
445    async fn drop_table(
446        &self,
447        source_id: Option<u32>,
448        table_id: TableId,
449        cascade: bool,
450    ) -> Result<()> {
451        if cascade {
452            return Err(ErrorCode::NotSupported(
453                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
454                "use drop instead".to_owned(),
455            )
456            .into());
457        }
458        if let Some(source_id) = source_id {
459            self.drop_table_or_source_id(source_id);
460        }
461        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
462        let indexes =
463            self.catalog
464                .read()
465                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
466        for index in indexes {
467            self.drop_index(index.id, cascade).await?;
468        }
469        self.catalog
470            .write()
471            .drop_table(database_id, schema_id, table_id);
472        if let Some(source_id) = source_id {
473            self.catalog
474                .write()
475                .drop_source(database_id, schema_id, source_id);
476        }
477        Ok(())
478    }
479
480    async fn drop_view(&self, _view_id: u32, _cascade: bool) -> Result<()> {
481        unreachable!()
482    }
483
484    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
485        if cascade {
486            return Err(ErrorCode::NotSupported(
487                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
488                "use drop instead".to_owned(),
489            )
490            .into());
491        }
492        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
493        let indexes =
494            self.catalog
495                .read()
496                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
497        for index in indexes {
498            self.drop_index(index.id, cascade).await?;
499        }
500        self.catalog
501            .write()
502            .drop_table(database_id, schema_id, table_id);
503        Ok(())
504    }
505
506    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
507        if cascade {
508            return Err(ErrorCode::NotSupported(
509                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
510                "use drop instead".to_owned(),
511            )
512            .into());
513        }
514        let (database_id, schema_id) = self.drop_table_or_source_id(source_id);
515        self.catalog
516            .write()
517            .drop_source(database_id, schema_id, source_id);
518        Ok(())
519    }
520
521    async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<()> {
522        if cascade {
523            return Err(ErrorCode::NotSupported(
524                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
525                "use drop instead".to_owned(),
526            )
527            .into());
528        }
529        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id);
530        self.catalog
531            .write()
532            .drop_sink(database_id, schema_id, sink_id);
533        Ok(())
534    }
535
536    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
537        if cascade {
538            return Err(ErrorCode::NotSupported(
539                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
540                "use drop instead".to_owned(),
541            )
542            .into());
543        }
544        let (database_id, schema_id) = self.drop_table_or_subscription_id(subscription_id);
545        self.catalog
546            .write()
547            .drop_subscription(database_id, schema_id, subscription_id);
548        Ok(())
549    }
550
551    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
552        if cascade {
553            return Err(ErrorCode::NotSupported(
554                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
555                "use drop instead".to_owned(),
556            )
557            .into());
558        }
559        let &schema_id = self
560            .table_id_to_schema_id
561            .read()
562            .get(&index_id.index_id)
563            .unwrap();
564        let database_id = self.get_database_id_by_schema(schema_id);
565
566        let index = {
567            let catalog_reader = self.catalog.read();
568            let schema_catalog = catalog_reader
569                .get_schema_by_id(&database_id, &schema_id)
570                .unwrap();
571            schema_catalog.get_index_by_id(&index_id).unwrap().clone()
572        };
573
574        let index_table_id = index.index_table().id;
575        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.index_id);
576        self.catalog
577            .write()
578            .drop_index(database_id, schema_id, index_id);
579        self.catalog
580            .write()
581            .drop_table(database_id, schema_id, index_table_id);
582        Ok(())
583    }
584
585    async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
586        unreachable!()
587    }
588
589    async fn drop_connection(&self, _connection_id: u32, _cascade: bool) -> Result<()> {
590        unreachable!()
591    }
592
593    async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
594        unreachable!()
595    }
596
597    async fn drop_database(&self, database_id: u32) -> Result<()> {
598        self.catalog.write().drop_database(database_id);
599        Ok(())
600    }
601
602    async fn drop_schema(&self, schema_id: u32, _cascade: bool) -> Result<()> {
603        let database_id = self.drop_schema_id(schema_id);
604        self.catalog.write().drop_schema(database_id, schema_id);
605        Ok(())
606    }
607
608    async fn alter_name(
609        &self,
610        object_id: alter_name_request::Object,
611        object_name: &str,
612    ) -> Result<()> {
613        match object_id {
614            alter_name_request::Object::TableId(table_id) => {
615                self.catalog
616                    .write()
617                    .alter_table_name_by_id(&table_id.into(), object_name);
618                Ok(())
619            }
620            _ => {
621                unimplemented!()
622            }
623        }
624    }
625
626    async fn alter_source(&self, source: PbSource) -> Result<()> {
627        self.catalog.write().update_source(&source);
628        Ok(())
629    }
630
631    async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
632        for database in self.catalog.read().iter_databases() {
633            for schema in database.iter_schemas() {
634                match object {
635                    Object::TableId(table_id) => {
636                        if let Some(table) =
637                            schema.get_created_table_by_id(&TableId::from(table_id))
638                        {
639                            let mut pb_table = table.to_prost();
640                            pb_table.owner = owner_id;
641                            self.catalog.write().update_table(&pb_table);
642                            return Ok(());
643                        }
644                    }
645                    _ => unreachable!(),
646                }
647            }
648        }
649
650        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
651    }
652
653    async fn alter_set_schema(
654        &self,
655        object: alter_set_schema_request::Object,
656        new_schema_id: u32,
657    ) -> Result<()> {
658        match object {
659            alter_set_schema_request::Object::TableId(table_id) => {
660                let mut pb_table = {
661                    let reader = self.catalog.read();
662                    let table = reader.get_any_table_by_id(&table_id.into())?.to_owned();
663                    table.to_prost()
664                };
665                pb_table.schema_id = new_schema_id;
666                self.catalog.write().update_table(&pb_table);
667                self.table_id_to_schema_id
668                    .write()
669                    .insert(table_id, new_schema_id);
670                Ok(())
671            }
672            _ => unreachable!(),
673        }
674    }
675
676    async fn alter_parallelism(
677        &self,
678        _table_id: u32,
679        _parallelism: PbTableParallelism,
680        _deferred: bool,
681    ) -> Result<()> {
682        todo!()
683    }
684
685    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
686        todo!()
687    }
688
689    async fn alter_secret(
690        &self,
691        _secret_id: u32,
692        _secret_name: String,
693        _database_id: u32,
694        _schema_id: u32,
695        _owner_id: u32,
696        _payload: Vec<u8>,
697    ) -> Result<()> {
698        unreachable!()
699    }
700
701    async fn alter_resource_group(
702        &self,
703        _table_id: u32,
704        _resource_group: Option<String>,
705        _deferred: bool,
706    ) -> Result<()> {
707        todo!()
708    }
709
710    async fn alter_database_param(
711        &self,
712        database_id: u32,
713        param: AlterDatabaseParam,
714    ) -> Result<()> {
715        let mut pb_database = {
716            let reader = self.catalog.read();
717            let database = reader.get_database_by_id(&database_id)?.to_owned();
718            database.to_prost()
719        };
720        match param {
721            AlterDatabaseParam::BarrierIntervalMs(interval) => {
722                pb_database.barrier_interval_ms = interval;
723            }
724            AlterDatabaseParam::CheckpointFrequency(frequency) => {
725                pb_database.checkpoint_frequency = frequency;
726            }
727        }
728        self.catalog.write().update_database(&pb_database);
729        Ok(())
730    }
731
732    async fn create_iceberg_table(
733        &self,
734        _table_job_info: PbTableJobInfo,
735        _sink_job_info: PbSinkJobInfo,
736        _iceberg_source: PbSource,
737        _if_not_exists: bool,
738    ) -> Result<()> {
739        todo!()
740    }
741}
742
743impl MockCatalogWriter {
744    pub fn new(
745        catalog: Arc<RwLock<Catalog>>,
746        hummock_snapshot_manager: HummockSnapshotManagerRef,
747    ) -> Self {
748        catalog.write().create_database(&PbDatabase {
749            id: 0,
750            name: DEFAULT_DATABASE_NAME.to_owned(),
751            owner: DEFAULT_SUPER_USER_ID,
752            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
753            barrier_interval_ms: None,
754            checkpoint_frequency: None,
755        });
756        catalog.write().create_schema(&PbSchema {
757            id: 1,
758            name: DEFAULT_SCHEMA_NAME.to_owned(),
759            database_id: 0,
760            owner: DEFAULT_SUPER_USER_ID,
761        });
762        catalog.write().create_schema(&PbSchema {
763            id: 2,
764            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
765            database_id: 0,
766            owner: DEFAULT_SUPER_USER_ID,
767        });
768        catalog.write().create_schema(&PbSchema {
769            id: 3,
770            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
771            database_id: 0,
772            owner: DEFAULT_SUPER_USER_ID,
773        });
774        let mut map: HashMap<u32, DatabaseId> = HashMap::new();
775        map.insert(1_u32, 0_u32);
776        map.insert(2_u32, 0_u32);
777        map.insert(3_u32, 0_u32);
778        Self {
779            catalog,
780            id: AtomicU32::new(3),
781            table_id_to_schema_id: Default::default(),
782            schema_id_to_database_id: RwLock::new(map),
783            hummock_snapshot_manager,
784        }
785    }
786
787    fn gen_id(&self) -> u32 {
788        self.id.fetch_add(1, Ordering::SeqCst) + 1
790    }
791
792    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
793        self.table_id_to_schema_id
794            .write()
795            .insert(table_id, schema_id);
796    }
797
798    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
799        let schema_id = self
800            .table_id_to_schema_id
801            .write()
802            .remove(&table_id)
803            .unwrap();
804        (self.get_database_id_by_schema(schema_id), schema_id)
805    }
806
807    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
808        self.table_id_to_schema_id
809            .write()
810            .insert(table_id, schema_id);
811    }
812
813    fn add_table_or_subscription_id(
814        &self,
815        table_id: u32,
816        schema_id: SchemaId,
817        _database_id: DatabaseId,
818    ) {
819        self.table_id_to_schema_id
820            .write()
821            .insert(table_id, schema_id);
822    }
823
824    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
825        self.table_id_to_schema_id
826            .write()
827            .insert(table_id, schema_id);
828    }
829
830    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
831        let schema_id = self
832            .table_id_to_schema_id
833            .write()
834            .remove(&table_id)
835            .unwrap();
836        (self.get_database_id_by_schema(schema_id), schema_id)
837    }
838
839    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
840        let schema_id = self
841            .table_id_to_schema_id
842            .write()
843            .remove(&table_id)
844            .unwrap();
845        (self.get_database_id_by_schema(schema_id), schema_id)
846    }
847
848    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
849        let schema_id = self
850            .table_id_to_schema_id
851            .write()
852            .remove(&table_id)
853            .unwrap();
854        (self.get_database_id_by_schema(schema_id), schema_id)
855    }
856
857    fn add_schema_id(&self, schema_id: u32, database_id: DatabaseId) {
858        self.schema_id_to_database_id
859            .write()
860            .insert(schema_id, database_id);
861    }
862
863    fn drop_schema_id(&self, schema_id: u32) -> DatabaseId {
864        self.schema_id_to_database_id
865            .write()
866            .remove(&schema_id)
867            .unwrap()
868    }
869
870    fn create_source_inner(&self, mut source: PbSource) -> Result<u32> {
871        source.id = self.gen_id();
872        self.catalog.write().create_source(&source);
873        self.add_table_or_source_id(source.id, source.schema_id, source.database_id);
874        Ok(source.id)
875    }
876
877    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
878        sink.id = self.gen_id();
879        sink.stream_job_status = PbStreamJobStatus::Created as _;
880        self.catalog.write().create_sink(&sink);
881        self.add_table_or_sink_id(sink.id, sink.schema_id, sink.database_id);
882        Ok(())
883    }
884
885    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
886        subscription.id = self.gen_id();
887        self.catalog.write().create_subscription(&subscription);
888        self.add_table_or_subscription_id(
889            subscription.id,
890            subscription.schema_id,
891            subscription.database_id,
892        );
893        Ok(())
894    }
895
896    fn get_database_id_by_schema(&self, schema_id: u32) -> DatabaseId {
897        *self
898            .schema_id_to_database_id
899            .read()
900            .get(&schema_id)
901            .unwrap()
902    }
903}
904
905pub struct MockUserInfoWriter {
906    id: AtomicU32,
907    user_info: Arc<RwLock<UserInfoManager>>,
908}
909
910#[async_trait::async_trait]
911impl UserInfoWriter for MockUserInfoWriter {
912    async fn create_user(&self, user: UserInfo) -> Result<()> {
913        let mut user = user;
914        user.id = self.gen_id();
915        self.user_info.write().create_user(user);
916        Ok(())
917    }
918
919    async fn drop_user(&self, id: UserId) -> Result<()> {
920        self.user_info.write().drop_user(id);
921        Ok(())
922    }
923
924    async fn update_user(
925        &self,
926        update_user: UserInfo,
927        update_fields: Vec<UpdateField>,
928    ) -> Result<()> {
929        let mut lock = self.user_info.write();
930        let id = update_user.get_id();
931        let Some(old_name) = lock.get_user_name_by_id(id) else {
932            return Ok(());
933        };
934        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
935        update_fields.into_iter().for_each(|field| match field {
936            UpdateField::Super => user_info.is_super = update_user.is_super,
937            UpdateField::Login => user_info.can_login = update_user.can_login,
938            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
939            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
940            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
941            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
942            UpdateField::Admin => user_info.is_admin = update_user.is_admin,
943            UpdateField::Unspecified => unreachable!(),
944        });
945        lock.update_user(update_user);
946        Ok(())
947    }
948
949    async fn grant_privilege(
952        &self,
953        users: Vec<UserId>,
954        privileges: Vec<GrantPrivilege>,
955        with_grant_option: bool,
956        _grantor: UserId,
957    ) -> Result<()> {
958        let privileges = privileges
959            .into_iter()
960            .map(|mut p| {
961                p.action_with_opts
962                    .iter_mut()
963                    .for_each(|ao| ao.with_grant_option = with_grant_option);
964                p
965            })
966            .collect::<Vec<_>>();
967        for user_id in users {
968            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
969                u.extend_privileges(privileges.clone());
970            }
971        }
972        Ok(())
973    }
974
975    async fn revoke_privilege(
978        &self,
979        users: Vec<UserId>,
980        privileges: Vec<GrantPrivilege>,
981        _granted_by: UserId,
982        _revoke_by: UserId,
983        revoke_grant_option: bool,
984        _cascade: bool,
985    ) -> Result<()> {
986        for user_id in users {
987            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
988                u.revoke_privileges(privileges.clone(), revoke_grant_option);
989            }
990        }
991        Ok(())
992    }
993
994    async fn alter_default_privilege(
995        &self,
996        _users: Vec<UserId>,
997        _database_id: DatabaseId,
998        _schemas: Vec<SchemaId>,
999        _operation: AlterDefaultPrivilegeOperation,
1000        _operated_by: UserId,
1001    ) -> Result<()> {
1002        todo!()
1003    }
1004}
1005
1006impl MockUserInfoWriter {
1007    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1008        user_info.write().create_user(UserInfo {
1009            id: DEFAULT_SUPER_USER_ID,
1010            name: DEFAULT_SUPER_USER.to_owned(),
1011            is_super: true,
1012            can_create_db: true,
1013            can_create_user: true,
1014            can_login: true,
1015            ..Default::default()
1016        });
1017        user_info.write().create_user(UserInfo {
1018            id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1019            name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1020            is_super: true,
1021            can_create_db: true,
1022            can_create_user: true,
1023            can_login: true,
1024            is_admin: true,
1025            ..Default::default()
1026        });
1027        Self {
1028            user_info,
1029            id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
1030        }
1031    }
1032
1033    fn gen_id(&self) -> u32 {
1034        self.id.fetch_add(1, Ordering::SeqCst)
1035    }
1036}
1037
1038pub struct MockFrontendMetaClient {}
1039
1040#[async_trait::async_trait]
1041impl FrontendMetaClient for MockFrontendMetaClient {
1042    async fn try_unregister(&self) {}
1043
1044    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1045        Ok(INVALID_VERSION_ID)
1046    }
1047
1048    async fn wait(&self) -> RpcResult<()> {
1049        Ok(())
1050    }
1051
1052    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1053        Ok(vec![])
1054    }
1055
1056    async fn list_table_fragments(
1057        &self,
1058        _table_ids: &[u32],
1059    ) -> RpcResult<HashMap<u32, TableFragmentInfo>> {
1060        Ok(HashMap::default())
1061    }
1062
1063    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1064        Ok(vec![])
1065    }
1066
1067    async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1068        Ok(vec![])
1069    }
1070
1071    async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1072        Ok(vec![])
1073    }
1074
1075    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1076        Ok(vec![])
1077    }
1078
1079    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1080        Ok(vec![])
1081    }
1082
1083    async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1084        Ok(vec![])
1085    }
1086
1087    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1088        Ok(vec![])
1089    }
1090
1091    async fn set_system_param(
1092        &self,
1093        _param: String,
1094        _value: Option<String>,
1095    ) -> RpcResult<Option<SystemParamsReader>> {
1096        Ok(Some(SystemParams::default().into()))
1097    }
1098
1099    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1100        Ok(Default::default())
1101    }
1102
1103    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1104        Ok("".to_owned())
1105    }
1106
1107    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1108        Ok(vec![])
1109    }
1110
1111    async fn get_tables(
1112        &self,
1113        _table_ids: &[u32],
1114        _include_dropped_tables: bool,
1115    ) -> RpcResult<HashMap<u32, Table>> {
1116        Ok(HashMap::new())
1117    }
1118
1119    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(u32, u64)>> {
1120        unimplemented!()
1121    }
1122
1123    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1124        Ok(HummockVersion::default())
1125    }
1126
1127    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1128        unimplemented!()
1129    }
1130
1131    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1132        unimplemented!()
1133    }
1134
1135    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1136        unimplemented!()
1137    }
1138
1139    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1140        unimplemented!()
1141    }
1142
1143    async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1144        unimplemented!()
1145    }
1146
1147    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1148        unimplemented!()
1149    }
1150
1151    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1152        unimplemented!()
1153    }
1154
1155    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1156        unimplemented!()
1157    }
1158
1159    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1160        Ok(vec![])
1161    }
1162
1163    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1164        unimplemented!()
1165    }
1166
1167    async fn recover(&self) -> RpcResult<()> {
1168        unimplemented!()
1169    }
1170
1171    async fn apply_throttle(
1172        &self,
1173        _kind: PbThrottleTarget,
1174        _id: u32,
1175        _rate_limit: Option<u32>,
1176    ) -> RpcResult<()> {
1177        unimplemented!()
1178    }
1179
1180    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1181        Ok(RecoveryStatus::StatusRunning)
1182    }
1183
1184    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1185        Ok(vec![])
1186    }
1187
1188    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1189        Ok(vec![])
1190    }
1191
1192    async fn list_cdc_progress(&self) -> RpcResult<HashMap<u32, PbCdcProgress>> {
1193        Ok(HashMap::default())
1194    }
1195
1196    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1197        unimplemented!()
1198    }
1199
1200    async fn alter_sink_props(
1201        &self,
1202        _sink_id: u32,
1203        _changed_props: BTreeMap<String, String>,
1204        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1205        _connector_conn_ref: Option<u32>,
1206    ) -> RpcResult<()> {
1207        unimplemented!()
1208    }
1209
1210    async fn alter_iceberg_table_props(
1211        &self,
1212        _table_id: u32,
1213        _sink_id: u32,
1214        _source_id: u32,
1215        _changed_props: BTreeMap<String, String>,
1216        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1217        _connector_conn_ref: Option<u32>,
1218    ) -> RpcResult<()> {
1219        unimplemented!()
1220    }
1221
1222    async fn alter_source_connector_props(
1223        &self,
1224        _source_id: u32,
1225        _changed_props: BTreeMap<String, String>,
1226        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1227        _connector_conn_ref: Option<u32>,
1228    ) -> RpcResult<()> {
1229        unimplemented!()
1230    }
1231
1232    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1233        unimplemented!()
1234    }
1235
1236    async fn get_fragment_by_id(
1237        &self,
1238        _fragment_id: u32,
1239    ) -> RpcResult<Option<FragmentDistribution>> {
1240        unimplemented!()
1241    }
1242
1243    fn worker_id(&self) -> u32 {
1244        0
1245    }
1246
1247    async fn set_sync_log_store_aligned(&self, _job_id: u32, _aligned: bool) -> RpcResult<()> {
1248        Ok(())
1249    }
1250
1251    async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1252        Ok(1)
1253    }
1254
1255    async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1256        Ok(())
1257    }
1258
1259    async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1260        Ok(RefreshResponse { status: None })
1261    }
1262}
1263
1264#[cfg(test)]
1265pub static PROTO_FILE_DATA: &str = r#"
1266    syntax = "proto3";
1267    package test;
1268    message TestRecord {
1269      int32 id = 1;
1270      Country country = 3;
1271      int64 zipcode = 4;
1272      float rate = 5;
1273    }
1274    message TestRecordAlterType {
1275        string id = 1;
1276        Country country = 3;
1277        int32 zipcode = 4;
1278        float rate = 5;
1279      }
1280    message TestRecordExt {
1281      int32 id = 1;
1282      Country country = 3;
1283      int64 zipcode = 4;
1284      float rate = 5;
1285      string name = 6;
1286    }
1287    message Country {
1288      string address = 1;
1289      City city = 2;
1290      string zipcode = 3;
1291    }
1292    message City {
1293      string address = 1;
1294      string zipcode = 2;
1295    }"#;
1296
1297pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1300    let in_file = Builder::new()
1301        .prefix("temp")
1302        .suffix(".proto")
1303        .rand_bytes(8)
1304        .tempfile()
1305        .unwrap();
1306
1307    let out_file = Builder::new()
1308        .prefix("temp")
1309        .suffix(".pb")
1310        .rand_bytes(8)
1311        .tempfile()
1312        .unwrap();
1313
1314    let mut file = in_file.as_file();
1315    file.write_all(proto_data.as_ref())
1316        .expect("writing binary to test file");
1317    file.flush().expect("flush temp file failed");
1318    let include_path = in_file
1319        .path()
1320        .parent()
1321        .unwrap()
1322        .to_string_lossy()
1323        .into_owned();
1324    let out_path = out_file.path().to_string_lossy().into_owned();
1325    let in_path = in_file.path().to_string_lossy().into_owned();
1326    let mut compile = std::process::Command::new("protoc");
1327
1328    let out = compile
1329        .arg("--include_imports")
1330        .arg("-I")
1331        .arg(include_path)
1332        .arg(format!("--descriptor_set_out={}", out_path))
1333        .arg(in_path)
1334        .output()
1335        .expect("failed to compile proto");
1336    if !out.status.success() {
1337        panic!("compile proto failed \n output: {:?}", out);
1338    }
1339    out_file
1340}