1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28 AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29 DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30 FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31 RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::session_config::SessionConfig;
35use risingwave_common::system_param::reader::SystemParamsReader;
36use risingwave_common::util::cluster_limit::ClusterLimit;
37use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
38use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
39use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
40use risingwave_pb::backup_service::MetaSnapshotMetadata;
41use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
42use risingwave_pb::catalog::{
43 PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44 PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::{
49 DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
50 alter_swap_rename_request, create_connection_request,
51};
52use risingwave_pb::hummock::write_limits::WriteLimit;
53use risingwave_pb::hummock::{
54 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
55};
56use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
57use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
58use risingwave_pb::meta::list_actor_states_response::ActorState;
59use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
60use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
61use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
62use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
63use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
64use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
65use risingwave_pb::meta::{
66 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
67 RefreshRequest, RefreshResponse, SystemParams,
68};
69use risingwave_pb::secret::PbSecretRef;
70use risingwave_pb::stream_plan::StreamFragmentGraph;
71use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
72use risingwave_pb::user::update_user_request::UpdateField;
73use risingwave_pb::user::{GrantPrivilege, UserInfo};
74use risingwave_rpc_client::error::Result as RpcResult;
75use tempfile::{Builder, NamedTempFile};
76
77use crate::FrontendOpts;
78use crate::catalog::catalog_service::CatalogWriter;
79use crate::catalog::root_catalog::Catalog;
80use crate::catalog::{DatabaseId, SchemaId, SecretId, SinkId};
81use crate::error::{ErrorCode, Result};
82use crate::handler::RwPgResponse;
83use crate::meta_client::FrontendMetaClient;
84use crate::scheduler::HummockSnapshotManagerRef;
85use crate::session::{AuthContext, FrontendEnv, SessionImpl};
86use crate::user::UserId;
87use crate::user::user_manager::UserInfoManager;
88use crate::user::user_service::UserInfoWriter;
89
90pub struct LocalFrontend {
92 pub opts: FrontendOpts,
93 env: FrontendEnv,
94}
95
96impl SessionManager for LocalFrontend {
97 type Session = SessionImpl;
98
99 fn create_dummy_session(
100 &self,
101 _database_id: u32,
102 _user_name: u32,
103 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
104 unreachable!()
105 }
106
107 fn connect(
108 &self,
109 _database: &str,
110 _user_name: &str,
111 _peer_addr: AddressRef,
112 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
113 Ok(self.session_ref())
114 }
115
116 fn cancel_queries_in_session(&self, _session_id: SessionId) {
117 unreachable!()
118 }
119
120 fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
121 unreachable!()
122 }
123
124 fn end_session(&self, _session: &Self::Session) {
125 unreachable!()
126 }
127}
128
129impl LocalFrontend {
130 #[expect(clippy::unused_async)]
131 pub async fn new(opts: FrontendOpts) -> Self {
132 let env = FrontendEnv::mock();
133 Self { opts, env }
134 }
135
136 pub async fn run_sql(
137 &self,
138 sql: impl Into<String>,
139 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
140 let sql: Arc<str> = Arc::from(sql.into());
141 self.session_ref().run_statement(sql, vec![]).await
142 }
143
144 pub async fn run_sql_with_session(
145 &self,
146 session_ref: Arc<SessionImpl>,
147 sql: impl Into<String>,
148 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
149 let sql: Arc<str> = Arc::from(sql.into());
150 session_ref.run_statement(sql, vec![]).await
151 }
152
153 pub async fn run_user_sql(
154 &self,
155 sql: impl Into<String>,
156 database: String,
157 user_name: String,
158 user_id: UserId,
159 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
160 let sql: Arc<str> = Arc::from(sql.into());
161 self.session_user_ref(database, user_name, user_id)
162 .run_statement(sql, vec![])
163 .await
164 }
165
166 pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
167 let mut rsp = self.run_sql(sql).await.unwrap();
168 let mut res = vec![];
169 #[for_await]
170 for row_set in rsp.values_stream() {
171 for row in row_set.unwrap() {
172 res.push(format!("{:?}", row));
173 }
174 }
175 res
176 }
177
178 pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
179 let mut rsp = self.run_sql(sql).await.unwrap();
180 assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
181 let mut res = String::new();
182 #[for_await]
183 for row_set in rsp.values_stream() {
184 for row in row_set.unwrap() {
185 let row: Row = row;
186 let row = row.values()[0].as_ref().unwrap();
187 res += std::str::from_utf8(row).unwrap();
188 res += "\n";
189 }
190 }
191 res
192 }
193
194 pub fn session_ref(&self) -> Arc<SessionImpl> {
196 self.session_user_ref(
197 DEFAULT_DATABASE_NAME.to_owned(),
198 DEFAULT_SUPER_USER.to_owned(),
199 DEFAULT_SUPER_USER_ID,
200 )
201 }
202
203 pub fn session_user_ref(
204 &self,
205 database: String,
206 user_name: String,
207 user_id: UserId,
208 ) -> Arc<SessionImpl> {
209 Arc::new(SessionImpl::new(
210 self.env.clone(),
211 AuthContext::new(database, user_name, user_id),
212 UserAuthenticator::None,
213 (0, 0),
215 Address::Tcp(SocketAddr::new(
216 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
217 6666,
218 ))
219 .into(),
220 Default::default(),
221 ))
222 }
223}
224
225pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
226 if rsp.stmt_type() != StatementType::EXPLAIN {
227 panic!("RESPONSE INVALID: {rsp:?}");
228 }
229 let mut res = String::new();
230 #[for_await]
231 for row_set in rsp.values_stream() {
232 for row in row_set.unwrap() {
233 let row: Row = row;
234 let row = row.values()[0].as_ref().unwrap();
235 res += std::str::from_utf8(row).unwrap();
236 res += "\n";
237 }
238 }
239 res
240}
241
242pub struct MockCatalogWriter {
243 catalog: Arc<RwLock<Catalog>>,
244 id: AtomicU32,
245 table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
246 schema_id_to_database_id: RwLock<HashMap<u32, DatabaseId>>,
247 hummock_snapshot_manager: HummockSnapshotManagerRef,
248}
249
250#[async_trait::async_trait]
251impl CatalogWriter for MockCatalogWriter {
252 async fn create_database(
253 &self,
254 db_name: &str,
255 owner: UserId,
256 resource_group: &str,
257 barrier_interval_ms: Option<u32>,
258 checkpoint_frequency: Option<u64>,
259 ) -> Result<()> {
260 let database_id = self.gen_id();
261 self.catalog.write().create_database(&PbDatabase {
262 name: db_name.to_owned(),
263 id: database_id,
264 owner,
265 resource_group: resource_group.to_owned(),
266 barrier_interval_ms,
267 checkpoint_frequency,
268 });
269 self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
270 .await?;
271 self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
272 .await?;
273 self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
274 .await?;
275 Ok(())
276 }
277
278 async fn create_schema(
279 &self,
280 db_id: DatabaseId,
281 schema_name: &str,
282 owner: UserId,
283 ) -> Result<()> {
284 let id = self.gen_id();
285 self.catalog.write().create_schema(&PbSchema {
286 id,
287 name: schema_name.to_owned(),
288 database_id: db_id,
289 owner,
290 });
291 self.add_schema_id(id, db_id);
292 Ok(())
293 }
294
295 async fn create_materialized_view(
296 &self,
297 mut table: PbTable,
298 _graph: StreamFragmentGraph,
299 _dependencies: HashSet<ObjectId>,
300 _specific_resource_group: Option<String>,
301 _if_not_exists: bool,
302 ) -> Result<()> {
303 table.id = self.gen_id();
304 table.stream_job_status = PbStreamJobStatus::Created as _;
305 table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
306 self.catalog.write().create_table(&table);
307 self.add_table_or_source_id(table.id, table.schema_id, table.database_id);
308 self.hummock_snapshot_manager
309 .add_table_for_test(TableId::new(table.id));
310 Ok(())
311 }
312
313 async fn replace_materialized_view(
314 &self,
315 mut table: PbTable,
316 _graph: StreamFragmentGraph,
317 ) -> Result<()> {
318 table.stream_job_status = PbStreamJobStatus::Created as _;
319 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
320 self.catalog.write().update_table(&table);
321 Ok(())
322 }
323
324 async fn create_view(&self, mut view: PbView, _dependencies: HashSet<ObjectId>) -> Result<()> {
325 view.id = self.gen_id();
326 self.catalog.write().create_view(&view);
327 self.add_table_or_source_id(view.id, view.schema_id, view.database_id);
328 Ok(())
329 }
330
331 async fn create_table(
332 &self,
333 source: Option<PbSource>,
334 mut table: PbTable,
335 graph: StreamFragmentGraph,
336 _job_type: PbTableJobType,
337 if_not_exists: bool,
338 _dependencies: HashSet<ObjectId>,
339 ) -> Result<()> {
340 if let Some(source) = source {
341 let source_id = self.create_source_inner(source)?;
342 table.optional_associated_source_id =
343 Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
344 }
345 self.create_materialized_view(table, graph, HashSet::new(), None, if_not_exists)
346 .await?;
347 Ok(())
348 }
349
350 async fn replace_table(
351 &self,
352 _source: Option<PbSource>,
353 mut table: PbTable,
354 _graph: StreamFragmentGraph,
355 _job_type: TableJobType,
356 ) -> Result<()> {
357 table.stream_job_status = PbStreamJobStatus::Created as _;
358 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
359 self.catalog.write().update_table(&table);
360 Ok(())
361 }
362
363 async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
364 self.catalog.write().update_source(&source);
365 Ok(())
366 }
367
368 async fn create_source(
369 &self,
370 source: PbSource,
371 _graph: Option<StreamFragmentGraph>,
372 _if_not_exists: bool,
373 ) -> Result<()> {
374 self.create_source_inner(source).map(|_| ())
375 }
376
377 async fn create_sink(
378 &self,
379 sink: PbSink,
380 graph: StreamFragmentGraph,
381 _dependencies: HashSet<ObjectId>,
382 _if_not_exists: bool,
383 ) -> Result<()> {
384 self.create_sink_inner(sink, graph)
385 }
386
387 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
388 self.create_subscription_inner(subscription)
389 }
390
391 async fn create_index(
392 &self,
393 mut index: PbIndex,
394 mut index_table: PbTable,
395 _graph: StreamFragmentGraph,
396 _if_not_exists: bool,
397 ) -> Result<()> {
398 index_table.id = self.gen_id();
399 index_table.stream_job_status = PbStreamJobStatus::Created as _;
400 index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
401 self.catalog.write().create_table(&index_table);
402 self.add_table_or_index_id(
403 index_table.id,
404 index_table.schema_id,
405 index_table.database_id,
406 );
407
408 index.id = index_table.id;
409 index.index_table_id = index_table.id;
410 self.catalog.write().create_index(&index);
411 Ok(())
412 }
413
414 async fn create_function(&self, _function: PbFunction) -> Result<()> {
415 unreachable!()
416 }
417
418 async fn create_connection(
419 &self,
420 _connection_name: String,
421 _database_id: u32,
422 _schema_id: u32,
423 _owner_id: u32,
424 _connection: create_connection_request::Payload,
425 ) -> Result<()> {
426 unreachable!()
427 }
428
429 async fn create_secret(
430 &self,
431 _secret_name: String,
432 _database_id: u32,
433 _schema_id: u32,
434 _owner_id: u32,
435 _payload: Vec<u8>,
436 ) -> Result<()> {
437 unreachable!()
438 }
439
440 async fn comment_on(&self, _comment: PbComment) -> Result<()> {
441 unreachable!()
442 }
443
444 async fn drop_table(
445 &self,
446 source_id: Option<u32>,
447 table_id: TableId,
448 cascade: bool,
449 ) -> Result<()> {
450 if cascade {
451 return Err(ErrorCode::NotSupported(
452 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
453 "use drop instead".to_owned(),
454 )
455 .into());
456 }
457 if let Some(source_id) = source_id {
458 self.drop_table_or_source_id(source_id);
459 }
460 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
461 let indexes =
462 self.catalog
463 .read()
464 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
465 for index in indexes {
466 self.drop_index(index.id, cascade).await?;
467 }
468 self.catalog
469 .write()
470 .drop_table(database_id, schema_id, table_id);
471 if let Some(source_id) = source_id {
472 self.catalog
473 .write()
474 .drop_source(database_id, schema_id, source_id);
475 }
476 Ok(())
477 }
478
479 async fn drop_view(&self, _view_id: u32, _cascade: bool) -> Result<()> {
480 unreachable!()
481 }
482
483 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
484 if cascade {
485 return Err(ErrorCode::NotSupported(
486 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
487 "use drop instead".to_owned(),
488 )
489 .into());
490 }
491 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
492 let indexes =
493 self.catalog
494 .read()
495 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
496 for index in indexes {
497 self.drop_index(index.id, cascade).await?;
498 }
499 self.catalog
500 .write()
501 .drop_table(database_id, schema_id, table_id);
502 Ok(())
503 }
504
505 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
506 if cascade {
507 return Err(ErrorCode::NotSupported(
508 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
509 "use drop instead".to_owned(),
510 )
511 .into());
512 }
513 let (database_id, schema_id) = self.drop_table_or_source_id(source_id);
514 self.catalog
515 .write()
516 .drop_source(database_id, schema_id, source_id);
517 Ok(())
518 }
519
520 async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<()> {
521 if cascade {
522 return Err(ErrorCode::NotSupported(
523 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
524 "use drop instead".to_owned(),
525 )
526 .into());
527 }
528 let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id);
529 self.catalog
530 .write()
531 .drop_sink(database_id, schema_id, sink_id);
532 Ok(())
533 }
534
535 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
536 if cascade {
537 return Err(ErrorCode::NotSupported(
538 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
539 "use drop instead".to_owned(),
540 )
541 .into());
542 }
543 let (database_id, schema_id) = self.drop_table_or_subscription_id(subscription_id);
544 self.catalog
545 .write()
546 .drop_subscription(database_id, schema_id, subscription_id);
547 Ok(())
548 }
549
550 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
551 if cascade {
552 return Err(ErrorCode::NotSupported(
553 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
554 "use drop instead".to_owned(),
555 )
556 .into());
557 }
558 let &schema_id = self
559 .table_id_to_schema_id
560 .read()
561 .get(&index_id.index_id)
562 .unwrap();
563 let database_id = self.get_database_id_by_schema(schema_id);
564
565 let index = {
566 let catalog_reader = self.catalog.read();
567 let schema_catalog = catalog_reader
568 .get_schema_by_id(&database_id, &schema_id)
569 .unwrap();
570 schema_catalog.get_index_by_id(&index_id).unwrap().clone()
571 };
572
573 let index_table_id = index.index_table().id;
574 let (database_id, schema_id) = self.drop_table_or_index_id(index_id.index_id);
575 self.catalog
576 .write()
577 .drop_index(database_id, schema_id, index_id);
578 self.catalog
579 .write()
580 .drop_table(database_id, schema_id, index_table_id);
581 Ok(())
582 }
583
584 async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
585 unreachable!()
586 }
587
588 async fn drop_connection(&self, _connection_id: u32, _cascade: bool) -> Result<()> {
589 unreachable!()
590 }
591
592 async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
593 unreachable!()
594 }
595
596 async fn drop_database(&self, database_id: u32) -> Result<()> {
597 self.catalog.write().drop_database(database_id);
598 Ok(())
599 }
600
601 async fn drop_schema(&self, schema_id: u32, _cascade: bool) -> Result<()> {
602 let database_id = self.drop_schema_id(schema_id);
603 self.catalog.write().drop_schema(database_id, schema_id);
604 Ok(())
605 }
606
607 async fn alter_name(
608 &self,
609 object_id: alter_name_request::Object,
610 object_name: &str,
611 ) -> Result<()> {
612 match object_id {
613 alter_name_request::Object::TableId(table_id) => {
614 self.catalog
615 .write()
616 .alter_table_name_by_id(&table_id.into(), object_name);
617 Ok(())
618 }
619 _ => {
620 unimplemented!()
621 }
622 }
623 }
624
625 async fn alter_source(&self, source: PbSource) -> Result<()> {
626 self.catalog.write().update_source(&source);
627 Ok(())
628 }
629
630 async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
631 for database in self.catalog.read().iter_databases() {
632 for schema in database.iter_schemas() {
633 match object {
634 Object::TableId(table_id) => {
635 if let Some(table) =
636 schema.get_created_table_by_id(&TableId::from(table_id))
637 {
638 let mut pb_table = table.to_prost();
639 pb_table.owner = owner_id;
640 self.catalog.write().update_table(&pb_table);
641 return Ok(());
642 }
643 }
644 _ => unreachable!(),
645 }
646 }
647 }
648
649 Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
650 }
651
652 async fn alter_set_schema(
653 &self,
654 object: alter_set_schema_request::Object,
655 new_schema_id: u32,
656 ) -> Result<()> {
657 match object {
658 alter_set_schema_request::Object::TableId(table_id) => {
659 let mut pb_table = {
660 let reader = self.catalog.read();
661 let table = reader.get_any_table_by_id(&table_id.into())?.to_owned();
662 table.to_prost()
663 };
664 pb_table.schema_id = new_schema_id;
665 self.catalog.write().update_table(&pb_table);
666 self.table_id_to_schema_id
667 .write()
668 .insert(table_id, new_schema_id);
669 Ok(())
670 }
671 _ => unreachable!(),
672 }
673 }
674
675 async fn alter_parallelism(
676 &self,
677 _table_id: u32,
678 _parallelism: PbTableParallelism,
679 _deferred: bool,
680 ) -> Result<()> {
681 todo!()
682 }
683
684 async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
685 todo!()
686 }
687
688 async fn alter_secret(
689 &self,
690 _secret_id: u32,
691 _secret_name: String,
692 _database_id: u32,
693 _schema_id: u32,
694 _owner_id: u32,
695 _payload: Vec<u8>,
696 ) -> Result<()> {
697 unreachable!()
698 }
699
700 async fn alter_resource_group(
701 &self,
702 _table_id: u32,
703 _resource_group: Option<String>,
704 _deferred: bool,
705 ) -> Result<()> {
706 todo!()
707 }
708
709 async fn alter_database_param(
710 &self,
711 database_id: u32,
712 param: AlterDatabaseParam,
713 ) -> Result<()> {
714 let mut pb_database = {
715 let reader = self.catalog.read();
716 let database = reader.get_database_by_id(&database_id)?.to_owned();
717 database.to_prost()
718 };
719 match param {
720 AlterDatabaseParam::BarrierIntervalMs(interval) => {
721 pb_database.barrier_interval_ms = interval;
722 }
723 AlterDatabaseParam::CheckpointFrequency(frequency) => {
724 pb_database.checkpoint_frequency = frequency;
725 }
726 }
727 self.catalog.write().update_database(&pb_database);
728 Ok(())
729 }
730}
731
732impl MockCatalogWriter {
733 pub fn new(
734 catalog: Arc<RwLock<Catalog>>,
735 hummock_snapshot_manager: HummockSnapshotManagerRef,
736 ) -> Self {
737 catalog.write().create_database(&PbDatabase {
738 id: 0,
739 name: DEFAULT_DATABASE_NAME.to_owned(),
740 owner: DEFAULT_SUPER_USER_ID,
741 resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
742 barrier_interval_ms: None,
743 checkpoint_frequency: None,
744 });
745 catalog.write().create_schema(&PbSchema {
746 id: 1,
747 name: DEFAULT_SCHEMA_NAME.to_owned(),
748 database_id: 0,
749 owner: DEFAULT_SUPER_USER_ID,
750 });
751 catalog.write().create_schema(&PbSchema {
752 id: 2,
753 name: PG_CATALOG_SCHEMA_NAME.to_owned(),
754 database_id: 0,
755 owner: DEFAULT_SUPER_USER_ID,
756 });
757 catalog.write().create_schema(&PbSchema {
758 id: 3,
759 name: RW_CATALOG_SCHEMA_NAME.to_owned(),
760 database_id: 0,
761 owner: DEFAULT_SUPER_USER_ID,
762 });
763 let mut map: HashMap<u32, DatabaseId> = HashMap::new();
764 map.insert(1_u32, 0_u32);
765 map.insert(2_u32, 0_u32);
766 map.insert(3_u32, 0_u32);
767 Self {
768 catalog,
769 id: AtomicU32::new(3),
770 table_id_to_schema_id: Default::default(),
771 schema_id_to_database_id: RwLock::new(map),
772 hummock_snapshot_manager,
773 }
774 }
775
776 fn gen_id(&self) -> u32 {
777 self.id.fetch_add(1, Ordering::SeqCst) + 1
779 }
780
781 fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
782 self.table_id_to_schema_id
783 .write()
784 .insert(table_id, schema_id);
785 }
786
787 fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
788 let schema_id = self
789 .table_id_to_schema_id
790 .write()
791 .remove(&table_id)
792 .unwrap();
793 (self.get_database_id_by_schema(schema_id), schema_id)
794 }
795
796 fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
797 self.table_id_to_schema_id
798 .write()
799 .insert(table_id, schema_id);
800 }
801
802 fn add_table_or_subscription_id(
803 &self,
804 table_id: u32,
805 schema_id: SchemaId,
806 _database_id: DatabaseId,
807 ) {
808 self.table_id_to_schema_id
809 .write()
810 .insert(table_id, schema_id);
811 }
812
813 fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
814 self.table_id_to_schema_id
815 .write()
816 .insert(table_id, schema_id);
817 }
818
819 fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
820 let schema_id = self
821 .table_id_to_schema_id
822 .write()
823 .remove(&table_id)
824 .unwrap();
825 (self.get_database_id_by_schema(schema_id), schema_id)
826 }
827
828 fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
829 let schema_id = self
830 .table_id_to_schema_id
831 .write()
832 .remove(&table_id)
833 .unwrap();
834 (self.get_database_id_by_schema(schema_id), schema_id)
835 }
836
837 fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
838 let schema_id = self
839 .table_id_to_schema_id
840 .write()
841 .remove(&table_id)
842 .unwrap();
843 (self.get_database_id_by_schema(schema_id), schema_id)
844 }
845
846 fn add_schema_id(&self, schema_id: u32, database_id: DatabaseId) {
847 self.schema_id_to_database_id
848 .write()
849 .insert(schema_id, database_id);
850 }
851
852 fn drop_schema_id(&self, schema_id: u32) -> DatabaseId {
853 self.schema_id_to_database_id
854 .write()
855 .remove(&schema_id)
856 .unwrap()
857 }
858
859 fn create_source_inner(&self, mut source: PbSource) -> Result<u32> {
860 source.id = self.gen_id();
861 self.catalog.write().create_source(&source);
862 self.add_table_or_source_id(source.id, source.schema_id, source.database_id);
863 Ok(source.id)
864 }
865
866 fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
867 sink.id = self.gen_id();
868 sink.stream_job_status = PbStreamJobStatus::Created as _;
869 self.catalog.write().create_sink(&sink);
870 self.add_table_or_sink_id(sink.id, sink.schema_id, sink.database_id);
871 Ok(())
872 }
873
874 fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
875 subscription.id = self.gen_id();
876 self.catalog.write().create_subscription(&subscription);
877 self.add_table_or_subscription_id(
878 subscription.id,
879 subscription.schema_id,
880 subscription.database_id,
881 );
882 Ok(())
883 }
884
885 fn get_database_id_by_schema(&self, schema_id: u32) -> DatabaseId {
886 *self
887 .schema_id_to_database_id
888 .read()
889 .get(&schema_id)
890 .unwrap()
891 }
892}
893
894pub struct MockUserInfoWriter {
895 id: AtomicU32,
896 user_info: Arc<RwLock<UserInfoManager>>,
897}
898
899#[async_trait::async_trait]
900impl UserInfoWriter for MockUserInfoWriter {
901 async fn create_user(&self, user: UserInfo) -> Result<()> {
902 let mut user = user;
903 user.id = self.gen_id();
904 self.user_info.write().create_user(user);
905 Ok(())
906 }
907
908 async fn drop_user(&self, id: UserId) -> Result<()> {
909 self.user_info.write().drop_user(id);
910 Ok(())
911 }
912
913 async fn update_user(
914 &self,
915 update_user: UserInfo,
916 update_fields: Vec<UpdateField>,
917 ) -> Result<()> {
918 let mut lock = self.user_info.write();
919 let id = update_user.get_id();
920 let Some(old_name) = lock.get_user_name_by_id(id) else {
921 return Ok(());
922 };
923 let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
924 update_fields.into_iter().for_each(|field| match field {
925 UpdateField::Super => user_info.is_super = update_user.is_super,
926 UpdateField::Login => user_info.can_login = update_user.can_login,
927 UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
928 UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
929 UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
930 UpdateField::Rename => user_info.name.clone_from(&update_user.name),
931 UpdateField::Admin => user_info.is_admin = update_user.is_admin,
932 UpdateField::Unspecified => unreachable!(),
933 });
934 lock.update_user(update_user);
935 Ok(())
936 }
937
938 async fn grant_privilege(
941 &self,
942 users: Vec<UserId>,
943 privileges: Vec<GrantPrivilege>,
944 with_grant_option: bool,
945 _grantor: UserId,
946 ) -> Result<()> {
947 let privileges = privileges
948 .into_iter()
949 .map(|mut p| {
950 p.action_with_opts
951 .iter_mut()
952 .for_each(|ao| ao.with_grant_option = with_grant_option);
953 p
954 })
955 .collect::<Vec<_>>();
956 for user_id in users {
957 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
958 u.extend_privileges(privileges.clone());
959 }
960 }
961 Ok(())
962 }
963
964 async fn revoke_privilege(
967 &self,
968 users: Vec<UserId>,
969 privileges: Vec<GrantPrivilege>,
970 _granted_by: UserId,
971 _revoke_by: UserId,
972 revoke_grant_option: bool,
973 _cascade: bool,
974 ) -> Result<()> {
975 for user_id in users {
976 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
977 u.revoke_privileges(privileges.clone(), revoke_grant_option);
978 }
979 }
980 Ok(())
981 }
982
983 async fn alter_default_privilege(
984 &self,
985 _users: Vec<UserId>,
986 _database_id: DatabaseId,
987 _schemas: Vec<SchemaId>,
988 _operation: AlterDefaultPrivilegeOperation,
989 _operated_by: UserId,
990 ) -> Result<()> {
991 todo!()
992 }
993}
994
995impl MockUserInfoWriter {
996 pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
997 user_info.write().create_user(UserInfo {
998 id: DEFAULT_SUPER_USER_ID,
999 name: DEFAULT_SUPER_USER.to_owned(),
1000 is_super: true,
1001 can_create_db: true,
1002 can_create_user: true,
1003 can_login: true,
1004 ..Default::default()
1005 });
1006 user_info.write().create_user(UserInfo {
1007 id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1008 name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1009 is_super: true,
1010 can_create_db: true,
1011 can_create_user: true,
1012 can_login: true,
1013 is_admin: true,
1014 ..Default::default()
1015 });
1016 Self {
1017 user_info,
1018 id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
1019 }
1020 }
1021
1022 fn gen_id(&self) -> u32 {
1023 self.id.fetch_add(1, Ordering::SeqCst)
1024 }
1025}
1026
1027pub struct MockFrontendMetaClient {}
1028
1029#[async_trait::async_trait]
1030impl FrontendMetaClient for MockFrontendMetaClient {
1031 async fn try_unregister(&self) {}
1032
1033 async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1034 Ok(INVALID_VERSION_ID)
1035 }
1036
1037 async fn wait(&self) -> RpcResult<()> {
1038 Ok(())
1039 }
1040
1041 async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1042 Ok(vec![])
1043 }
1044
1045 async fn list_table_fragments(
1046 &self,
1047 _table_ids: &[u32],
1048 ) -> RpcResult<HashMap<u32, TableFragmentInfo>> {
1049 Ok(HashMap::default())
1050 }
1051
1052 async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1053 Ok(vec![])
1054 }
1055
1056 async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1057 Ok(vec![])
1058 }
1059
1060 async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1061 Ok(vec![])
1062 }
1063
1064 async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1065 Ok(vec![])
1066 }
1067
1068 async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1069 Ok(vec![])
1070 }
1071
1072 async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1073 Ok(vec![])
1074 }
1075
1076 async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1077 Ok(vec![])
1078 }
1079
1080 async fn set_system_param(
1081 &self,
1082 _param: String,
1083 _value: Option<String>,
1084 ) -> RpcResult<Option<SystemParamsReader>> {
1085 Ok(Some(SystemParams::default().into()))
1086 }
1087
1088 async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1089 Ok(Default::default())
1090 }
1091
1092 async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1093 Ok("".to_owned())
1094 }
1095
1096 async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1097 Ok(vec![])
1098 }
1099
1100 async fn get_tables(
1101 &self,
1102 _table_ids: &[u32],
1103 _include_dropped_tables: bool,
1104 ) -> RpcResult<HashMap<u32, Table>> {
1105 Ok(HashMap::new())
1106 }
1107
1108 async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(u32, u64)>> {
1109 unimplemented!()
1110 }
1111
1112 async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1113 Ok(HummockVersion::default())
1114 }
1115
1116 async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1117 unimplemented!()
1118 }
1119
1120 async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1121 unimplemented!()
1122 }
1123
1124 async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1125 unimplemented!()
1126 }
1127
1128 async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1129 unimplemented!()
1130 }
1131
1132 async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1133 unimplemented!()
1134 }
1135
1136 async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1137 unimplemented!()
1138 }
1139
1140 async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1141 unimplemented!()
1142 }
1143
1144 async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1145 unimplemented!()
1146 }
1147
1148 async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1149 Ok(vec![])
1150 }
1151
1152 async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1153 unimplemented!()
1154 }
1155
1156 async fn recover(&self) -> RpcResult<()> {
1157 unimplemented!()
1158 }
1159
1160 async fn apply_throttle(
1161 &self,
1162 _kind: PbThrottleTarget,
1163 _id: u32,
1164 _rate_limit: Option<u32>,
1165 ) -> RpcResult<()> {
1166 unimplemented!()
1167 }
1168
1169 async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1170 Ok(RecoveryStatus::StatusRunning)
1171 }
1172
1173 async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1174 Ok(vec![])
1175 }
1176
1177 async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1178 Ok(vec![])
1179 }
1180
1181 async fn list_cdc_progress(&self) -> RpcResult<HashMap<u32, PbCdcProgress>> {
1182 Ok(HashMap::default())
1183 }
1184
1185 async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1186 unimplemented!()
1187 }
1188
1189 async fn alter_sink_props(
1190 &self,
1191 _sink_id: u32,
1192 _changed_props: BTreeMap<String, String>,
1193 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1194 _connector_conn_ref: Option<u32>,
1195 ) -> RpcResult<()> {
1196 unimplemented!()
1197 }
1198
1199 async fn alter_source_connector_props(
1200 &self,
1201 _source_id: u32,
1202 _changed_props: BTreeMap<String, String>,
1203 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1204 _connector_conn_ref: Option<u32>,
1205 ) -> RpcResult<()> {
1206 unimplemented!()
1207 }
1208
1209 async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1210 unimplemented!()
1211 }
1212
1213 async fn get_fragment_by_id(
1214 &self,
1215 _fragment_id: u32,
1216 ) -> RpcResult<Option<FragmentDistribution>> {
1217 unimplemented!()
1218 }
1219
1220 fn worker_id(&self) -> u32 {
1221 0
1222 }
1223
1224 async fn set_sync_log_store_aligned(&self, _job_id: u32, _aligned: bool) -> RpcResult<()> {
1225 Ok(())
1226 }
1227
1228 async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1229 Ok(1)
1230 }
1231
1232 async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1233 Ok(())
1234 }
1235
1236 async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1237 Ok(RefreshResponse { status: None })
1238 }
1239}
1240
1241#[cfg(test)]
1242pub static PROTO_FILE_DATA: &str = r#"
1243 syntax = "proto3";
1244 package test;
1245 message TestRecord {
1246 int32 id = 1;
1247 Country country = 3;
1248 int64 zipcode = 4;
1249 float rate = 5;
1250 }
1251 message TestRecordAlterType {
1252 string id = 1;
1253 Country country = 3;
1254 int32 zipcode = 4;
1255 float rate = 5;
1256 }
1257 message TestRecordExt {
1258 int32 id = 1;
1259 Country country = 3;
1260 int64 zipcode = 4;
1261 float rate = 5;
1262 string name = 6;
1263 }
1264 message Country {
1265 string address = 1;
1266 City city = 2;
1267 string zipcode = 3;
1268 }
1269 message City {
1270 string address = 1;
1271 string zipcode = 2;
1272 }"#;
1273
1274pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1277 let in_file = Builder::new()
1278 .prefix("temp")
1279 .suffix(".proto")
1280 .rand_bytes(8)
1281 .tempfile()
1282 .unwrap();
1283
1284 let out_file = Builder::new()
1285 .prefix("temp")
1286 .suffix(".pb")
1287 .rand_bytes(8)
1288 .tempfile()
1289 .unwrap();
1290
1291 let mut file = in_file.as_file();
1292 file.write_all(proto_data.as_ref())
1293 .expect("writing binary to test file");
1294 file.flush().expect("flush temp file failed");
1295 let include_path = in_file
1296 .path()
1297 .parent()
1298 .unwrap()
1299 .to_string_lossy()
1300 .into_owned();
1301 let out_path = out_file.path().to_string_lossy().into_owned();
1302 let in_path = in_file.path().to_string_lossy().into_owned();
1303 let mut compile = std::process::Command::new("protoc");
1304
1305 let out = compile
1306 .arg("--include_imports")
1307 .arg("-I")
1308 .arg(include_path)
1309 .arg(format!("--descriptor_set_out={}", out_path))
1310 .arg(in_path)
1311 .output()
1312 .expect("failed to compile proto");
1313 if !out.status.success() {
1314 panic!("compile proto failed \n output: {:?}", out);
1315 }
1316 out_file
1317}