risingwave_frontend/
test_utils.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29    DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30    FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31    RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::util::cluster_limit::ClusterLimit;
38use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
39use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
40use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
41use risingwave_pb::backup_service::MetaSnapshotMetadata;
42use risingwave_pb::catalog::{
43    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44    PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
49use risingwave_pb::ddl_service::{
50    DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
51    alter_swap_rename_request, create_connection_request, streaming_job_resource_type,
52};
53use risingwave_pb::hummock::write_limits::WriteLimit;
54use risingwave_pb::hummock::{
55    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
56};
57use risingwave_pb::id::ActorId;
58use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
59use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
60use risingwave_pb::meta::list_actor_states_response::ActorState;
61use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
62use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
63use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
64use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
65use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
66use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
67use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
68use risingwave_pb::meta::{
69    EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
70    RefreshRequest, RefreshResponse, SystemParams,
71};
72use risingwave_pb::secret::PbSecretRef;
73use risingwave_pb::stream_plan::StreamFragmentGraph;
74use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
75use risingwave_pb::user::update_user_request::UpdateField;
76use risingwave_pb::user::{GrantPrivilege, UserInfo};
77use risingwave_rpc_client::error::Result as RpcResult;
78use tempfile::{Builder, NamedTempFile};
79
80use crate::FrontendOpts;
81use crate::catalog::catalog_service::CatalogWriter;
82use crate::catalog::root_catalog::Catalog;
83use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
84use crate::error::{ErrorCode, Result, RwError};
85use crate::handler::RwPgResponse;
86use crate::meta_client::FrontendMetaClient;
87use crate::scheduler::HummockSnapshotManagerRef;
88use crate::session::{AuthContext, FrontendEnv, SessionImpl};
89use crate::user::UserId;
90use crate::user::user_manager::UserInfoManager;
91use crate::user::user_service::UserInfoWriter;
92
93/// An embedded frontend without starting meta and without starting frontend as a tcp server.
94pub struct LocalFrontend {
95    pub opts: FrontendOpts,
96    env: FrontendEnv,
97}
98
99impl SessionManager for LocalFrontend {
100    type Error = RwError;
101    type Session = SessionImpl;
102
103    fn create_dummy_session(
104        &self,
105        _database_id: DatabaseId,
106    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
107        unreachable!()
108    }
109
110    fn connect(
111        &self,
112        _database: &str,
113        _user_name: &str,
114        _peer_addr: AddressRef,
115    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
116        Ok(self.session_ref())
117    }
118
119    fn cancel_queries_in_session(&self, _session_id: SessionId) {
120        unreachable!()
121    }
122
123    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
124        unreachable!()
125    }
126
127    fn end_session(&self, _session: &Self::Session) {
128        unreachable!()
129    }
130}
131
132impl LocalFrontend {
133    #[expect(clippy::unused_async)]
134    pub async fn new(opts: FrontendOpts) -> Self {
135        let env = FrontendEnv::mock();
136        Self { opts, env }
137    }
138
139    pub async fn run_sql(
140        &self,
141        sql: impl Into<String>,
142    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
143        let sql: Arc<str> = Arc::from(sql.into());
144        self.session_ref().run_statement(sql, vec![]).await
145    }
146
147    pub async fn run_sql_with_session(
148        &self,
149        session_ref: Arc<SessionImpl>,
150        sql: impl Into<String>,
151    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
152        let sql: Arc<str> = Arc::from(sql.into());
153        session_ref.run_statement(sql, vec![]).await
154    }
155
156    pub async fn run_user_sql(
157        &self,
158        sql: impl Into<String>,
159        database: String,
160        user_name: String,
161        user_id: UserId,
162    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
163        let sql: Arc<str> = Arc::from(sql.into());
164        self.session_user_ref(database, user_name, user_id)
165            .run_statement(sql, vec![])
166            .await
167    }
168
169    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
170        let mut rsp = self.run_sql(sql).await.unwrap();
171        let mut res = vec![];
172        #[for_await]
173        for row_set in rsp.values_stream() {
174            for row in row_set.unwrap() {
175                res.push(format!("{:?}", row));
176            }
177        }
178        res
179    }
180
181    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
182        let mut rsp = self.run_sql(sql).await.unwrap();
183        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
184        let mut res = String::new();
185        #[for_await]
186        for row_set in rsp.values_stream() {
187            for row in row_set.unwrap() {
188                let row: Row = row;
189                let row = row.values()[0].as_ref().unwrap();
190                res += std::str::from_utf8(row).unwrap();
191                res += "\n";
192            }
193        }
194        res
195    }
196
197    /// Creates a new session
198    pub fn session_ref(&self) -> Arc<SessionImpl> {
199        self.session_user_ref(
200            DEFAULT_DATABASE_NAME.to_owned(),
201            DEFAULT_SUPER_USER.to_owned(),
202            DEFAULT_SUPER_USER_ID,
203        )
204    }
205
206    pub fn session_user_ref(
207        &self,
208        database: String,
209        user_name: String,
210        user_id: UserId,
211    ) -> Arc<SessionImpl> {
212        Arc::new(SessionImpl::new(
213            self.env.clone(),
214            AuthContext::new(database, user_name, user_id),
215            UserAuthenticator::None,
216            // Local Frontend use a non-sense id.
217            (0, 0),
218            Address::Tcp(SocketAddr::new(
219                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
220                6666,
221            ))
222            .into(),
223            Default::default(),
224        ))
225    }
226}
227
228pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
229    if rsp.stmt_type() != StatementType::EXPLAIN {
230        panic!("RESPONSE INVALID: {rsp:?}");
231    }
232    let mut res = String::new();
233    #[for_await]
234    for row_set in rsp.values_stream() {
235        for row in row_set.unwrap() {
236            let row: Row = row;
237            let row = row.values()[0].as_ref().unwrap();
238            res += std::str::from_utf8(row).unwrap();
239            res += "\n";
240        }
241    }
242    res
243}
244
245pub struct MockCatalogWriter {
246    catalog: Arc<RwLock<Catalog>>,
247    id: AtomicU32,
248    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
249    schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
250    hummock_snapshot_manager: HummockSnapshotManagerRef,
251}
252
253#[async_trait::async_trait]
254impl CatalogWriter for MockCatalogWriter {
255    async fn create_database(
256        &self,
257        db_name: &str,
258        owner: UserId,
259        resource_group: &str,
260        barrier_interval_ms: Option<u32>,
261        checkpoint_frequency: Option<u64>,
262    ) -> Result<()> {
263        let database_id = DatabaseId::new(self.gen_id());
264        self.catalog.write().create_database(&PbDatabase {
265            name: db_name.to_owned(),
266            id: database_id,
267            owner,
268            resource_group: resource_group.to_owned(),
269            barrier_interval_ms,
270            checkpoint_frequency,
271        });
272        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
273            .await?;
274        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
275            .await?;
276        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
277            .await?;
278        Ok(())
279    }
280
281    async fn create_schema(
282        &self,
283        db_id: DatabaseId,
284        schema_name: &str,
285        owner: UserId,
286    ) -> Result<()> {
287        let id = self.gen_id();
288        self.catalog.write().create_schema(&PbSchema {
289            id,
290            name: schema_name.to_owned(),
291            database_id: db_id,
292            owner,
293        });
294        self.add_schema_id(id, db_id);
295        Ok(())
296    }
297
298    async fn create_materialized_view(
299        &self,
300        mut table: PbTable,
301        _graph: StreamFragmentGraph,
302        _dependencies: HashSet<ObjectId>,
303        _resource_type: streaming_job_resource_type::ResourceType,
304        _if_not_exists: bool,
305    ) -> Result<()> {
306        table.id = self.gen_id();
307        table.stream_job_status = PbStreamJobStatus::Created as _;
308        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
309        self.catalog.write().create_table(&table);
310        self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
311        self.hummock_snapshot_manager.add_table_for_test(table.id);
312        Ok(())
313    }
314
315    async fn replace_materialized_view(
316        &self,
317        mut table: PbTable,
318        _graph: StreamFragmentGraph,
319    ) -> Result<()> {
320        table.stream_job_status = PbStreamJobStatus::Created as _;
321        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
322        self.catalog.write().update_table(&table);
323        Ok(())
324    }
325
326    async fn create_view(&self, mut view: PbView, _dependencies: HashSet<ObjectId>) -> Result<()> {
327        view.id = self.gen_id();
328        self.catalog.write().create_view(&view);
329        self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
330        Ok(())
331    }
332
333    async fn create_table(
334        &self,
335        source: Option<PbSource>,
336        mut table: PbTable,
337        graph: StreamFragmentGraph,
338        _job_type: PbTableJobType,
339        if_not_exists: bool,
340        _dependencies: HashSet<ObjectId>,
341    ) -> Result<()> {
342        if let Some(source) = source {
343            let source_id = self.create_source_inner(source)?;
344            table.optional_associated_source_id = Some(source_id.into());
345        }
346        self.create_materialized_view(
347            table,
348            graph,
349            HashSet::new(),
350            streaming_job_resource_type::ResourceType::Regular(true),
351            if_not_exists,
352        )
353        .await?;
354        Ok(())
355    }
356
357    async fn replace_table(
358        &self,
359        _source: Option<PbSource>,
360        mut table: PbTable,
361        _graph: StreamFragmentGraph,
362        _job_type: TableJobType,
363    ) -> Result<()> {
364        table.stream_job_status = PbStreamJobStatus::Created as _;
365        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
366        self.catalog.write().update_table(&table);
367        Ok(())
368    }
369
370    async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
371        self.catalog.write().update_source(&source);
372        Ok(())
373    }
374
375    async fn create_source(
376        &self,
377        source: PbSource,
378        _graph: Option<StreamFragmentGraph>,
379        _if_not_exists: bool,
380    ) -> Result<()> {
381        self.create_source_inner(source).map(|_| ())
382    }
383
384    async fn create_sink(
385        &self,
386        sink: PbSink,
387        graph: StreamFragmentGraph,
388        _dependencies: HashSet<ObjectId>,
389        _if_not_exists: bool,
390    ) -> Result<()> {
391        self.create_sink_inner(sink, graph)
392    }
393
394    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
395        self.create_subscription_inner(subscription)
396    }
397
398    async fn create_index(
399        &self,
400        mut index: PbIndex,
401        mut index_table: PbTable,
402        _graph: StreamFragmentGraph,
403        _if_not_exists: bool,
404    ) -> Result<()> {
405        index_table.id = self.gen_id();
406        index_table.stream_job_status = PbStreamJobStatus::Created as _;
407        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
408        self.catalog.write().create_table(&index_table);
409        self.add_table_or_index_id(
410            index_table.id.as_raw_id(),
411            index_table.schema_id,
412            index_table.database_id,
413        );
414
415        index.id = index_table.id.as_raw_id().into();
416        index.index_table_id = index_table.id;
417        self.catalog.write().create_index(&index);
418        Ok(())
419    }
420
421    async fn create_function(&self, _function: PbFunction) -> Result<()> {
422        unreachable!()
423    }
424
425    async fn create_connection(
426        &self,
427        _connection_name: String,
428        _database_id: DatabaseId,
429        _schema_id: SchemaId,
430        _owner_id: u32,
431        _connection: create_connection_request::Payload,
432    ) -> Result<()> {
433        unreachable!()
434    }
435
436    async fn create_secret(
437        &self,
438        _secret_name: String,
439        _database_id: DatabaseId,
440        _schema_id: SchemaId,
441        _owner_id: u32,
442        _payload: Vec<u8>,
443    ) -> Result<()> {
444        unreachable!()
445    }
446
447    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
448        unreachable!()
449    }
450
451    async fn drop_table(
452        &self,
453        source_id: Option<u32>,
454        table_id: TableId,
455        cascade: bool,
456    ) -> Result<()> {
457        if cascade {
458            return Err(ErrorCode::NotSupported(
459                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
460                "use drop instead".to_owned(),
461            )
462            .into());
463        }
464        if let Some(source_id) = source_id {
465            self.drop_table_or_source_id(source_id);
466        }
467        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
468        let indexes =
469            self.catalog
470                .read()
471                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
472        for index in indexes {
473            self.drop_index(index.id, cascade).await?;
474        }
475        self.catalog
476            .write()
477            .drop_table(database_id, schema_id, table_id);
478        if let Some(source_id) = source_id {
479            self.catalog
480                .write()
481                .drop_source(database_id, schema_id, source_id.into());
482        }
483        Ok(())
484    }
485
486    async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
487        unreachable!()
488    }
489
490    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
491        if cascade {
492            return Err(ErrorCode::NotSupported(
493                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
494                "use drop instead".to_owned(),
495            )
496            .into());
497        }
498        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
499        let indexes =
500            self.catalog
501                .read()
502                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
503        for index in indexes {
504            self.drop_index(index.id, cascade).await?;
505        }
506        self.catalog
507            .write()
508            .drop_table(database_id, schema_id, table_id);
509        Ok(())
510    }
511
512    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
513        if cascade {
514            return Err(ErrorCode::NotSupported(
515                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
516                "use drop instead".to_owned(),
517            )
518            .into());
519        }
520        let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
521        self.catalog
522            .write()
523            .drop_source(database_id, schema_id, source_id);
524        Ok(())
525    }
526
527    async fn reset_source(&self, _source_id: SourceId) -> Result<()> {
528        Ok(())
529    }
530
531    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
532        if cascade {
533            return Err(ErrorCode::NotSupported(
534                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
535                "use drop instead".to_owned(),
536            )
537            .into());
538        }
539        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
540        self.catalog
541            .write()
542            .drop_sink(database_id, schema_id, sink_id);
543        Ok(())
544    }
545
546    async fn drop_subscription(
547        &self,
548        subscription_id: SubscriptionId,
549        cascade: bool,
550    ) -> Result<()> {
551        if cascade {
552            return Err(ErrorCode::NotSupported(
553                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
554                "use drop instead".to_owned(),
555            )
556            .into());
557        }
558        let (database_id, schema_id) =
559            self.drop_table_or_subscription_id(subscription_id.as_raw_id());
560        self.catalog
561            .write()
562            .drop_subscription(database_id, schema_id, subscription_id);
563        Ok(())
564    }
565
566    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
567        if cascade {
568            return Err(ErrorCode::NotSupported(
569                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
570                "use drop instead".to_owned(),
571            )
572            .into());
573        }
574        let &schema_id = self
575            .table_id_to_schema_id
576            .read()
577            .get(&index_id.as_raw_id())
578            .unwrap();
579        let database_id = self.get_database_id_by_schema(schema_id);
580
581        let index = {
582            let catalog_reader = self.catalog.read();
583            let schema_catalog = catalog_reader
584                .get_schema_by_id(database_id, schema_id)
585                .unwrap();
586            schema_catalog.get_index_by_id(index_id).unwrap().clone()
587        };
588
589        let index_table_id = index.index_table().id;
590        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
591        self.catalog
592            .write()
593            .drop_index(database_id, schema_id, index_id);
594        self.catalog
595            .write()
596            .drop_table(database_id, schema_id, index_table_id);
597        Ok(())
598    }
599
600    async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
601        unreachable!()
602    }
603
604    async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
605        unreachable!()
606    }
607
608    async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
609        unreachable!()
610    }
611
612    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
613        self.catalog.write().drop_database(database_id);
614        Ok(())
615    }
616
617    async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
618        let database_id = self.drop_schema_id(schema_id);
619        self.catalog.write().drop_schema(database_id, schema_id);
620        Ok(())
621    }
622
623    async fn alter_name(
624        &self,
625        object_id: alter_name_request::Object,
626        object_name: &str,
627    ) -> Result<()> {
628        match object_id {
629            alter_name_request::Object::TableId(table_id) => {
630                self.catalog
631                    .write()
632                    .alter_table_name_by_id(table_id.into(), object_name);
633                Ok(())
634            }
635            _ => {
636                unimplemented!()
637            }
638        }
639    }
640
641    async fn alter_source(&self, source: PbSource) -> Result<()> {
642        self.catalog.write().update_source(&source);
643        Ok(())
644    }
645
646    async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
647        for database in self.catalog.read().iter_databases() {
648            for schema in database.iter_schemas() {
649                match object {
650                    Object::TableId(table_id) => {
651                        if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
652                        {
653                            let mut pb_table = table.to_prost();
654                            pb_table.owner = owner_id;
655                            self.catalog.write().update_table(&pb_table);
656                            return Ok(());
657                        }
658                    }
659                    _ => unreachable!(),
660                }
661            }
662        }
663
664        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
665    }
666
667    async fn alter_set_schema(
668        &self,
669        object: alter_set_schema_request::Object,
670        new_schema_id: SchemaId,
671    ) -> Result<()> {
672        match object {
673            alter_set_schema_request::Object::TableId(table_id) => {
674                let mut pb_table = {
675                    let reader = self.catalog.read();
676                    let table = reader.get_any_table_by_id(table_id.into())?.to_owned();
677                    table.to_prost()
678                };
679                pb_table.schema_id = new_schema_id;
680                self.catalog.write().update_table(&pb_table);
681                self.table_id_to_schema_id
682                    .write()
683                    .insert(table_id, new_schema_id);
684                Ok(())
685            }
686            _ => unreachable!(),
687        }
688    }
689
690    async fn alter_parallelism(
691        &self,
692        _job_id: JobId,
693        _parallelism: PbTableParallelism,
694        _deferred: bool,
695    ) -> Result<()> {
696        todo!()
697    }
698
699    async fn alter_config(
700        &self,
701        _job_id: JobId,
702        _entries_to_add: HashMap<String, String>,
703        _keys_to_remove: Vec<String>,
704    ) -> Result<()> {
705        todo!()
706    }
707
708    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
709        todo!()
710    }
711
712    async fn alter_secret(
713        &self,
714        _secret_id: SecretId,
715        _secret_name: String,
716        _database_id: DatabaseId,
717        _schema_id: SchemaId,
718        _owner_id: u32,
719        _payload: Vec<u8>,
720    ) -> Result<()> {
721        unreachable!()
722    }
723
724    async fn alter_resource_group(
725        &self,
726        _table_id: TableId,
727        _resource_group: Option<String>,
728        _deferred: bool,
729    ) -> Result<()> {
730        todo!()
731    }
732
733    async fn alter_database_param(
734        &self,
735        database_id: DatabaseId,
736        param: AlterDatabaseParam,
737    ) -> Result<()> {
738        let mut pb_database = {
739            let reader = self.catalog.read();
740            let database = reader.get_database_by_id(database_id)?.to_owned();
741            database.to_prost()
742        };
743        match param {
744            AlterDatabaseParam::BarrierIntervalMs(interval) => {
745                pb_database.barrier_interval_ms = interval;
746            }
747            AlterDatabaseParam::CheckpointFrequency(frequency) => {
748                pb_database.checkpoint_frequency = frequency;
749            }
750        }
751        self.catalog.write().update_database(&pb_database);
752        Ok(())
753    }
754
755    async fn create_iceberg_table(
756        &self,
757        _table_job_info: PbTableJobInfo,
758        _sink_job_info: PbSinkJobInfo,
759        _iceberg_source: PbSource,
760        _if_not_exists: bool,
761    ) -> Result<()> {
762        todo!()
763    }
764}
765
766impl MockCatalogWriter {
767    pub fn new(
768        catalog: Arc<RwLock<Catalog>>,
769        hummock_snapshot_manager: HummockSnapshotManagerRef,
770    ) -> Self {
771        catalog.write().create_database(&PbDatabase {
772            id: 0.into(),
773            name: DEFAULT_DATABASE_NAME.to_owned(),
774            owner: DEFAULT_SUPER_USER_ID,
775            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
776            barrier_interval_ms: None,
777            checkpoint_frequency: None,
778        });
779        catalog.write().create_schema(&PbSchema {
780            id: 1.into(),
781            name: DEFAULT_SCHEMA_NAME.to_owned(),
782            database_id: 0.into(),
783            owner: DEFAULT_SUPER_USER_ID,
784        });
785        catalog.write().create_schema(&PbSchema {
786            id: 2.into(),
787            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
788            database_id: 0.into(),
789            owner: DEFAULT_SUPER_USER_ID,
790        });
791        catalog.write().create_schema(&PbSchema {
792            id: 3.into(),
793            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
794            database_id: 0.into(),
795            owner: DEFAULT_SUPER_USER_ID,
796        });
797        let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
798        map.insert(1_u32.into(), 0_u32.into());
799        map.insert(2_u32.into(), 0_u32.into());
800        map.insert(3_u32.into(), 0_u32.into());
801        Self {
802            catalog,
803            id: AtomicU32::new(3),
804            table_id_to_schema_id: Default::default(),
805            schema_id_to_database_id: RwLock::new(map),
806            hummock_snapshot_manager,
807        }
808    }
809
810    fn gen_id<T: From<u32>>(&self) -> T {
811        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
812        (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
813    }
814
815    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
816        self.table_id_to_schema_id
817            .write()
818            .insert(table_id, schema_id);
819    }
820
821    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
822        let schema_id = self
823            .table_id_to_schema_id
824            .write()
825            .remove(&table_id)
826            .unwrap();
827        (self.get_database_id_by_schema(schema_id), schema_id)
828    }
829
830    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
831        self.table_id_to_schema_id
832            .write()
833            .insert(table_id, schema_id);
834    }
835
836    fn add_table_or_subscription_id(
837        &self,
838        table_id: u32,
839        schema_id: SchemaId,
840        _database_id: DatabaseId,
841    ) {
842        self.table_id_to_schema_id
843            .write()
844            .insert(table_id, schema_id);
845    }
846
847    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
848        self.table_id_to_schema_id
849            .write()
850            .insert(table_id, schema_id);
851    }
852
853    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
854        let schema_id = self
855            .table_id_to_schema_id
856            .write()
857            .remove(&table_id)
858            .unwrap();
859        (self.get_database_id_by_schema(schema_id), schema_id)
860    }
861
862    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
863        let schema_id = self
864            .table_id_to_schema_id
865            .write()
866            .remove(&table_id)
867            .unwrap();
868        (self.get_database_id_by_schema(schema_id), schema_id)
869    }
870
871    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
872        let schema_id = self
873            .table_id_to_schema_id
874            .write()
875            .remove(&table_id)
876            .unwrap();
877        (self.get_database_id_by_schema(schema_id), schema_id)
878    }
879
880    fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
881        self.schema_id_to_database_id
882            .write()
883            .insert(schema_id, database_id);
884    }
885
886    fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
887        self.schema_id_to_database_id
888            .write()
889            .remove(&schema_id)
890            .unwrap()
891    }
892
893    fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
894        source.id = self.gen_id();
895        self.catalog.write().create_source(&source);
896        self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
897        Ok(source.id)
898    }
899
900    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
901        sink.id = self.gen_id();
902        sink.stream_job_status = PbStreamJobStatus::Created as _;
903        self.catalog.write().create_sink(&sink);
904        self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
905        Ok(())
906    }
907
908    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
909        subscription.id = self.gen_id();
910        self.catalog.write().create_subscription(&subscription);
911        self.add_table_or_subscription_id(
912            subscription.id.as_raw_id(),
913            subscription.schema_id,
914            subscription.database_id,
915        );
916        Ok(())
917    }
918
919    fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
920        *self
921            .schema_id_to_database_id
922            .read()
923            .get(&schema_id)
924            .unwrap()
925    }
926}
927
928pub struct MockUserInfoWriter {
929    id: AtomicU32,
930    user_info: Arc<RwLock<UserInfoManager>>,
931}
932
933#[async_trait::async_trait]
934impl UserInfoWriter for MockUserInfoWriter {
935    async fn create_user(&self, user: UserInfo) -> Result<()> {
936        let mut user = user;
937        user.id = self.gen_id();
938        self.user_info.write().create_user(user);
939        Ok(())
940    }
941
942    async fn drop_user(&self, id: UserId) -> Result<()> {
943        self.user_info.write().drop_user(id);
944        Ok(())
945    }
946
947    async fn update_user(
948        &self,
949        update_user: UserInfo,
950        update_fields: Vec<UpdateField>,
951    ) -> Result<()> {
952        let mut lock = self.user_info.write();
953        let id = update_user.get_id();
954        let Some(old_name) = lock.get_user_name_by_id(id) else {
955            return Ok(());
956        };
957        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
958        update_fields.into_iter().for_each(|field| match field {
959            UpdateField::Super => user_info.is_super = update_user.is_super,
960            UpdateField::Login => user_info.can_login = update_user.can_login,
961            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
962            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
963            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
964            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
965            UpdateField::Admin => user_info.is_admin = update_user.is_admin,
966            UpdateField::Unspecified => unreachable!(),
967        });
968        lock.update_user(update_user);
969        Ok(())
970    }
971
972    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
973    /// `GrantAllSources` when grant privilege to user.
974    async fn grant_privilege(
975        &self,
976        users: Vec<UserId>,
977        privileges: Vec<GrantPrivilege>,
978        with_grant_option: bool,
979        _grantor: UserId,
980    ) -> Result<()> {
981        let privileges = privileges
982            .into_iter()
983            .map(|mut p| {
984                p.action_with_opts
985                    .iter_mut()
986                    .for_each(|ao| ao.with_grant_option = with_grant_option);
987                p
988            })
989            .collect::<Vec<_>>();
990        for user_id in users {
991            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
992                u.extend_privileges(privileges.clone());
993            }
994        }
995        Ok(())
996    }
997
998    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
999    /// `RevokeAllSources` when revoke privilege from user.
1000    async fn revoke_privilege(
1001        &self,
1002        users: Vec<UserId>,
1003        privileges: Vec<GrantPrivilege>,
1004        _granted_by: UserId,
1005        _revoke_by: UserId,
1006        revoke_grant_option: bool,
1007        _cascade: bool,
1008    ) -> Result<()> {
1009        for user_id in users {
1010            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1011                u.revoke_privileges(privileges.clone(), revoke_grant_option);
1012            }
1013        }
1014        Ok(())
1015    }
1016
1017    async fn alter_default_privilege(
1018        &self,
1019        _users: Vec<UserId>,
1020        _database_id: DatabaseId,
1021        _schemas: Vec<SchemaId>,
1022        _operation: AlterDefaultPrivilegeOperation,
1023        _operated_by: UserId,
1024    ) -> Result<()> {
1025        todo!()
1026    }
1027}
1028
1029impl MockUserInfoWriter {
1030    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1031        user_info.write().create_user(UserInfo {
1032            id: DEFAULT_SUPER_USER_ID,
1033            name: DEFAULT_SUPER_USER.to_owned(),
1034            is_super: true,
1035            can_create_db: true,
1036            can_create_user: true,
1037            can_login: true,
1038            ..Default::default()
1039        });
1040        user_info.write().create_user(UserInfo {
1041            id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1042            name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1043            is_super: true,
1044            can_create_db: true,
1045            can_create_user: true,
1046            can_login: true,
1047            is_admin: true,
1048            ..Default::default()
1049        });
1050        Self {
1051            user_info,
1052            id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
1053        }
1054    }
1055
1056    fn gen_id(&self) -> u32 {
1057        self.id.fetch_add(1, Ordering::SeqCst)
1058    }
1059}
1060
1061pub struct MockFrontendMetaClient {}
1062
1063#[async_trait::async_trait]
1064impl FrontendMetaClient for MockFrontendMetaClient {
1065    async fn try_unregister(&self) {}
1066
1067    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1068        Ok(INVALID_VERSION_ID)
1069    }
1070
1071    async fn wait(&self) -> RpcResult<()> {
1072        Ok(())
1073    }
1074
1075    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1076        Ok(vec![])
1077    }
1078
1079    async fn list_table_fragments(
1080        &self,
1081        _table_ids: &[JobId],
1082    ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1083        Ok(HashMap::default())
1084    }
1085
1086    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1087        Ok(vec![])
1088    }
1089
1090    async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1091        Ok(vec![])
1092    }
1093
1094    async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1095        Ok(vec![])
1096    }
1097
1098    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1099        Ok(vec![])
1100    }
1101
1102    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1103        Ok(vec![])
1104    }
1105
1106    async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1107        Ok(vec![])
1108    }
1109
1110    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1111        Ok(vec![])
1112    }
1113
1114    async fn set_system_param(
1115        &self,
1116        _param: String,
1117        _value: Option<String>,
1118    ) -> RpcResult<Option<SystemParamsReader>> {
1119        Ok(Some(SystemParams::default().into()))
1120    }
1121
1122    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1123        Ok(Default::default())
1124    }
1125
1126    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1127        Ok("".to_owned())
1128    }
1129
1130    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1131        Ok(vec![])
1132    }
1133
1134    async fn get_tables(
1135        &self,
1136        _table_ids: Vec<crate::catalog::TableId>,
1137        _include_dropped_tables: bool,
1138    ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1139        Ok(HashMap::new())
1140    }
1141
1142    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, u64)>> {
1143        unimplemented!()
1144    }
1145
1146    async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1147        unimplemented!()
1148    }
1149
1150    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1151        Ok(HummockVersion::default())
1152    }
1153
1154    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1155        unimplemented!()
1156    }
1157
1158    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1159        unimplemented!()
1160    }
1161
1162    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1163        unimplemented!()
1164    }
1165
1166    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1167        unimplemented!()
1168    }
1169
1170    async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1171        unimplemented!()
1172    }
1173
1174    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1175        unimplemented!()
1176    }
1177
1178    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1179        Ok(vec![])
1180    }
1181
1182    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1183        unimplemented!()
1184    }
1185
1186    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1187        Ok(vec![])
1188    }
1189
1190    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1191        unimplemented!()
1192    }
1193
1194    async fn recover(&self) -> RpcResult<()> {
1195        unimplemented!()
1196    }
1197
1198    async fn apply_throttle(
1199        &self,
1200        _throttle_target: PbThrottleTarget,
1201        _throttle_type: risingwave_pb::common::PbThrottleType,
1202        _id: u32,
1203        _rate_limit: Option<u32>,
1204    ) -> RpcResult<()> {
1205        unimplemented!()
1206    }
1207
1208    async fn alter_fragment_parallelism(
1209        &self,
1210        _fragment_ids: Vec<FragmentId>,
1211        _parallelism: Option<PbTableParallelism>,
1212    ) -> RpcResult<()> {
1213        unimplemented!()
1214    }
1215
1216    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1217        Ok(RecoveryStatus::StatusRunning)
1218    }
1219
1220    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1221        Ok(vec![])
1222    }
1223
1224    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1225        Ok(vec![])
1226    }
1227
1228    async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1229        Ok(HashMap::default())
1230    }
1231
1232    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1233        unimplemented!()
1234    }
1235
1236    async fn alter_sink_props(
1237        &self,
1238        _sink_id: SinkId,
1239        _changed_props: BTreeMap<String, String>,
1240        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1241        _connector_conn_ref: Option<ConnectionId>,
1242    ) -> RpcResult<()> {
1243        unimplemented!()
1244    }
1245
1246    async fn alter_iceberg_table_props(
1247        &self,
1248        _table_id: TableId,
1249        _sink_id: SinkId,
1250        _source_id: SourceId,
1251        _changed_props: BTreeMap<String, String>,
1252        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1253        _connector_conn_ref: Option<ConnectionId>,
1254    ) -> RpcResult<()> {
1255        unimplemented!()
1256    }
1257
1258    async fn alter_source_connector_props(
1259        &self,
1260        _source_id: SourceId,
1261        _changed_props: BTreeMap<String, String>,
1262        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1263        _connector_conn_ref: Option<ConnectionId>,
1264    ) -> RpcResult<()> {
1265        unimplemented!()
1266    }
1267
1268    async fn alter_connection_connector_props(
1269        &self,
1270        _connection_id: u32,
1271        _changed_props: BTreeMap<String, String>,
1272        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1273    ) -> RpcResult<()> {
1274        Ok(())
1275    }
1276
1277    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1278        unimplemented!()
1279    }
1280
1281    async fn get_fragment_by_id(
1282        &self,
1283        _fragment_id: FragmentId,
1284    ) -> RpcResult<Option<FragmentDistribution>> {
1285        unimplemented!()
1286    }
1287
1288    async fn get_fragment_vnodes(
1289        &self,
1290        _fragment_id: FragmentId,
1291    ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1292        unimplemented!()
1293    }
1294
1295    async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1296        unimplemented!()
1297    }
1298
1299    fn worker_id(&self) -> WorkerId {
1300        0.into()
1301    }
1302
1303    async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1304        Ok(())
1305    }
1306
1307    async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1308        Ok(1)
1309    }
1310
1311    async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1312        Ok(())
1313    }
1314
1315    async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1316        Ok(RefreshResponse { status: None })
1317    }
1318
1319    fn cluster_id(&self) -> &str {
1320        "test-cluster-uuid"
1321    }
1322
1323    async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1324        unimplemented!()
1325    }
1326}
1327
1328#[cfg(test)]
1329pub static PROTO_FILE_DATA: &str = r#"
1330    syntax = "proto3";
1331    package test;
1332    message TestRecord {
1333      int32 id = 1;
1334      Country country = 3;
1335      int64 zipcode = 4;
1336      float rate = 5;
1337    }
1338    message TestRecordAlterType {
1339        string id = 1;
1340        Country country = 3;
1341        int32 zipcode = 4;
1342        float rate = 5;
1343      }
1344    message TestRecordExt {
1345      int32 id = 1;
1346      Country country = 3;
1347      int64 zipcode = 4;
1348      float rate = 5;
1349      string name = 6;
1350    }
1351    message Country {
1352      string address = 1;
1353      City city = 2;
1354      string zipcode = 3;
1355    }
1356    message City {
1357      string address = 1;
1358      string zipcode = 2;
1359    }"#;
1360
1361/// Returns the file.
1362/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1363pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1364    let in_file = Builder::new()
1365        .prefix("temp")
1366        .suffix(".proto")
1367        .rand_bytes(8)
1368        .tempfile()
1369        .unwrap();
1370
1371    let out_file = Builder::new()
1372        .prefix("temp")
1373        .suffix(".pb")
1374        .rand_bytes(8)
1375        .tempfile()
1376        .unwrap();
1377
1378    let mut file = in_file.as_file();
1379    file.write_all(proto_data.as_ref())
1380        .expect("writing binary to test file");
1381    file.flush().expect("flush temp file failed");
1382    let include_path = in_file
1383        .path()
1384        .parent()
1385        .unwrap()
1386        .to_string_lossy()
1387        .into_owned();
1388    let out_path = out_file.path().to_string_lossy().into_owned();
1389    let in_path = in_file.path().to_string_lossy().into_owned();
1390    let mut compile = std::process::Command::new("protoc");
1391
1392    let out = compile
1393        .arg("--include_imports")
1394        .arg("-I")
1395        .arg(include_path)
1396        .arg(format!("--descriptor_set_out={}", out_path))
1397        .arg(in_path)
1398        .output()
1399        .expect("failed to compile proto");
1400    if !out.status.success() {
1401        panic!("compile proto failed \n output: {:?}", out);
1402    }
1403    out_file
1404}