1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28 AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29 DEFAULT_SUPER_USER_ID, FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId,
30 PG_CATALOG_SCHEMA_NAME, RW_CATALOG_SCHEMA_NAME, TableId,
31};
32use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
33use risingwave_common::session_config::SessionConfig;
34use risingwave_common::system_param::reader::SystemParamsReader;
35use risingwave_common::util::cluster_limit::ClusterLimit;
36use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
37use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
38use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
39use risingwave_pb::backup_service::MetaSnapshotMetadata;
40use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
41use risingwave_pb::catalog::{
42 PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
43 PbSubscription, PbTable, PbView, Table,
44};
45use risingwave_pb::common::WorkerNode;
46use risingwave_pb::ddl_service::alter_owner_request::Object;
47use risingwave_pb::ddl_service::{
48 DdlProgress, PbTableJobType, ReplaceJobPlan, TableJobType, alter_name_request,
49 alter_set_schema_request, alter_swap_rename_request, create_connection_request,
50};
51use risingwave_pb::hummock::write_limits::WriteLimit;
52use risingwave_pb::hummock::{
53 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
54};
55use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
56use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
57use risingwave_pb::meta::list_actor_states_response::ActorState;
58use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
59use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
60use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
61use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
62use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
63use risingwave_pb::meta::{
64 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
65 RefreshRequest, RefreshResponse, SystemParams,
66};
67use risingwave_pb::secret::PbSecretRef;
68use risingwave_pb::stream_plan::StreamFragmentGraph;
69use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
70use risingwave_pb::user::update_user_request::UpdateField;
71use risingwave_pb::user::{GrantPrivilege, UserInfo};
72use risingwave_rpc_client::error::Result as RpcResult;
73use tempfile::{Builder, NamedTempFile};
74
75use crate::FrontendOpts;
76use crate::catalog::catalog_service::CatalogWriter;
77use crate::catalog::root_catalog::Catalog;
78use crate::catalog::{DatabaseId, SchemaId, SecretId, SinkId};
79use crate::error::{ErrorCode, Result};
80use crate::handler::RwPgResponse;
81use crate::meta_client::FrontendMetaClient;
82use crate::scheduler::HummockSnapshotManagerRef;
83use crate::session::{AuthContext, FrontendEnv, SessionImpl};
84use crate::user::UserId;
85use crate::user::user_manager::UserInfoManager;
86use crate::user::user_service::UserInfoWriter;
87
88pub struct LocalFrontend {
90 pub opts: FrontendOpts,
91 env: FrontendEnv,
92}
93
94impl SessionManager for LocalFrontend {
95 type Session = SessionImpl;
96
97 fn create_dummy_session(
98 &self,
99 _database_id: u32,
100 _user_name: u32,
101 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
102 unreachable!()
103 }
104
105 fn connect(
106 &self,
107 _database: &str,
108 _user_name: &str,
109 _peer_addr: AddressRef,
110 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
111 Ok(self.session_ref())
112 }
113
114 fn cancel_queries_in_session(&self, _session_id: SessionId) {
115 unreachable!()
116 }
117
118 fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
119 unreachable!()
120 }
121
122 fn end_session(&self, _session: &Self::Session) {
123 unreachable!()
124 }
125}
126
127impl LocalFrontend {
128 #[expect(clippy::unused_async)]
129 pub async fn new(opts: FrontendOpts) -> Self {
130 let env = FrontendEnv::mock();
131 Self { opts, env }
132 }
133
134 pub async fn run_sql(
135 &self,
136 sql: impl Into<String>,
137 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
138 let sql: Arc<str> = Arc::from(sql.into());
139 self.session_ref().run_statement(sql, vec![]).await
140 }
141
142 pub async fn run_sql_with_session(
143 &self,
144 session_ref: Arc<SessionImpl>,
145 sql: impl Into<String>,
146 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
147 let sql: Arc<str> = Arc::from(sql.into());
148 session_ref.run_statement(sql, vec![]).await
149 }
150
151 pub async fn run_user_sql(
152 &self,
153 sql: impl Into<String>,
154 database: String,
155 user_name: String,
156 user_id: UserId,
157 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
158 let sql: Arc<str> = Arc::from(sql.into());
159 self.session_user_ref(database, user_name, user_id)
160 .run_statement(sql, vec![])
161 .await
162 }
163
164 pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
165 let mut rsp = self.run_sql(sql).await.unwrap();
166 let mut res = vec![];
167 #[for_await]
168 for row_set in rsp.values_stream() {
169 for row in row_set.unwrap() {
170 res.push(format!("{:?}", row));
171 }
172 }
173 res
174 }
175
176 pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
177 let mut rsp = self.run_sql(sql).await.unwrap();
178 assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
179 let mut res = String::new();
180 #[for_await]
181 for row_set in rsp.values_stream() {
182 for row in row_set.unwrap() {
183 let row: Row = row;
184 let row = row.values()[0].as_ref().unwrap();
185 res += std::str::from_utf8(row).unwrap();
186 res += "\n";
187 }
188 }
189 res
190 }
191
192 pub fn session_ref(&self) -> Arc<SessionImpl> {
194 self.session_user_ref(
195 DEFAULT_DATABASE_NAME.to_owned(),
196 DEFAULT_SUPER_USER.to_owned(),
197 DEFAULT_SUPER_USER_ID,
198 )
199 }
200
201 pub fn session_user_ref(
202 &self,
203 database: String,
204 user_name: String,
205 user_id: UserId,
206 ) -> Arc<SessionImpl> {
207 Arc::new(SessionImpl::new(
208 self.env.clone(),
209 AuthContext::new(database, user_name, user_id),
210 UserAuthenticator::None,
211 (0, 0),
213 Address::Tcp(SocketAddr::new(
214 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
215 6666,
216 ))
217 .into(),
218 Default::default(),
219 ))
220 }
221}
222
223pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
224 if rsp.stmt_type() != StatementType::EXPLAIN {
225 panic!("RESPONSE INVALID: {rsp:?}");
226 }
227 let mut res = String::new();
228 #[for_await]
229 for row_set in rsp.values_stream() {
230 for row in row_set.unwrap() {
231 let row: Row = row;
232 let row = row.values()[0].as_ref().unwrap();
233 res += std::str::from_utf8(row).unwrap();
234 res += "\n";
235 }
236 }
237 res
238}
239
240pub struct MockCatalogWriter {
241 catalog: Arc<RwLock<Catalog>>,
242 id: AtomicU32,
243 table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
244 schema_id_to_database_id: RwLock<HashMap<u32, DatabaseId>>,
245 hummock_snapshot_manager: HummockSnapshotManagerRef,
246}
247
248#[async_trait::async_trait]
249impl CatalogWriter for MockCatalogWriter {
250 async fn create_database(
251 &self,
252 db_name: &str,
253 owner: UserId,
254 resource_group: &str,
255 barrier_interval_ms: Option<u32>,
256 checkpoint_frequency: Option<u64>,
257 ) -> Result<()> {
258 let database_id = self.gen_id();
259 self.catalog.write().create_database(&PbDatabase {
260 name: db_name.to_owned(),
261 id: database_id,
262 owner,
263 resource_group: resource_group.to_owned(),
264 barrier_interval_ms,
265 checkpoint_frequency,
266 });
267 self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
268 .await?;
269 self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
270 .await?;
271 self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
272 .await?;
273 Ok(())
274 }
275
276 async fn create_schema(
277 &self,
278 db_id: DatabaseId,
279 schema_name: &str,
280 owner: UserId,
281 ) -> Result<()> {
282 let id = self.gen_id();
283 self.catalog.write().create_schema(&PbSchema {
284 id,
285 name: schema_name.to_owned(),
286 database_id: db_id,
287 owner,
288 });
289 self.add_schema_id(id, db_id);
290 Ok(())
291 }
292
293 async fn create_materialized_view(
294 &self,
295 mut table: PbTable,
296 _graph: StreamFragmentGraph,
297 _dependencies: HashSet<ObjectId>,
298 _specific_resource_group: Option<String>,
299 _if_not_exists: bool,
300 ) -> Result<()> {
301 table.id = self.gen_id();
302 table.stream_job_status = PbStreamJobStatus::Created as _;
303 table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
304 self.catalog.write().create_table(&table);
305 self.add_table_or_source_id(table.id, table.schema_id, table.database_id);
306 self.hummock_snapshot_manager
307 .add_table_for_test(TableId::new(table.id));
308 Ok(())
309 }
310
311 async fn replace_materialized_view(
312 &self,
313 mut table: PbTable,
314 _graph: StreamFragmentGraph,
315 ) -> Result<()> {
316 table.stream_job_status = PbStreamJobStatus::Created as _;
317 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
318 self.catalog.write().update_table(&table);
319 Ok(())
320 }
321
322 async fn create_view(&self, mut view: PbView, _dependencies: HashSet<ObjectId>) -> Result<()> {
323 view.id = self.gen_id();
324 self.catalog.write().create_view(&view);
325 self.add_table_or_source_id(view.id, view.schema_id, view.database_id);
326 Ok(())
327 }
328
329 async fn create_table(
330 &self,
331 source: Option<PbSource>,
332 mut table: PbTable,
333 graph: StreamFragmentGraph,
334 _job_type: PbTableJobType,
335 if_not_exists: bool,
336 _dependencies: HashSet<ObjectId>,
337 ) -> Result<()> {
338 if let Some(source) = source {
339 let source_id = self.create_source_inner(source)?;
340 table.optional_associated_source_id =
341 Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
342 }
343 self.create_materialized_view(table, graph, HashSet::new(), None, if_not_exists)
344 .await?;
345 Ok(())
346 }
347
348 async fn replace_table(
349 &self,
350 _source: Option<PbSource>,
351 mut table: PbTable,
352 _graph: StreamFragmentGraph,
353 _job_type: TableJobType,
354 ) -> Result<()> {
355 table.stream_job_status = PbStreamJobStatus::Created as _;
356 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
357 self.catalog.write().update_table(&table);
358 Ok(())
359 }
360
361 async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
362 self.catalog.write().update_source(&source);
363 Ok(())
364 }
365
366 async fn create_source(
367 &self,
368 source: PbSource,
369 _graph: Option<StreamFragmentGraph>,
370 _if_not_exists: bool,
371 ) -> Result<()> {
372 self.create_source_inner(source).map(|_| ())
373 }
374
375 async fn create_sink(
376 &self,
377 sink: PbSink,
378 graph: StreamFragmentGraph,
379 _affected_table_change: Option<ReplaceJobPlan>,
380 _dependencies: HashSet<ObjectId>,
381 _if_not_exists: bool,
382 ) -> Result<()> {
383 self.create_sink_inner(sink, graph)
384 }
385
386 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
387 self.create_subscription_inner(subscription)
388 }
389
390 async fn create_index(
391 &self,
392 mut index: PbIndex,
393 mut index_table: PbTable,
394 _graph: StreamFragmentGraph,
395 _if_not_exists: bool,
396 ) -> Result<()> {
397 index_table.id = self.gen_id();
398 index_table.stream_job_status = PbStreamJobStatus::Created as _;
399 index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
400 self.catalog.write().create_table(&index_table);
401 self.add_table_or_index_id(
402 index_table.id,
403 index_table.schema_id,
404 index_table.database_id,
405 );
406
407 index.id = index_table.id;
408 index.index_table_id = index_table.id;
409 self.catalog.write().create_index(&index);
410 Ok(())
411 }
412
413 async fn create_function(&self, _function: PbFunction) -> Result<()> {
414 unreachable!()
415 }
416
417 async fn create_connection(
418 &self,
419 _connection_name: String,
420 _database_id: u32,
421 _schema_id: u32,
422 _owner_id: u32,
423 _connection: create_connection_request::Payload,
424 ) -> Result<()> {
425 unreachable!()
426 }
427
428 async fn create_secret(
429 &self,
430 _secret_name: String,
431 _database_id: u32,
432 _schema_id: u32,
433 _owner_id: u32,
434 _payload: Vec<u8>,
435 ) -> Result<()> {
436 unreachable!()
437 }
438
439 async fn comment_on(&self, _comment: PbComment) -> Result<()> {
440 unreachable!()
441 }
442
443 async fn drop_table(
444 &self,
445 source_id: Option<u32>,
446 table_id: TableId,
447 cascade: bool,
448 ) -> Result<()> {
449 if cascade {
450 return Err(ErrorCode::NotSupported(
451 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
452 "use drop instead".to_owned(),
453 )
454 .into());
455 }
456 if let Some(source_id) = source_id {
457 self.drop_table_or_source_id(source_id);
458 }
459 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
460 let indexes =
461 self.catalog
462 .read()
463 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
464 for index in indexes {
465 self.drop_index(index.id, cascade).await?;
466 }
467 self.catalog
468 .write()
469 .drop_table(database_id, schema_id, table_id);
470 if let Some(source_id) = source_id {
471 self.catalog
472 .write()
473 .drop_source(database_id, schema_id, source_id);
474 }
475 Ok(())
476 }
477
478 async fn drop_view(&self, _view_id: u32, _cascade: bool) -> Result<()> {
479 unreachable!()
480 }
481
482 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
483 if cascade {
484 return Err(ErrorCode::NotSupported(
485 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
486 "use drop instead".to_owned(),
487 )
488 .into());
489 }
490 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
491 let indexes =
492 self.catalog
493 .read()
494 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
495 for index in indexes {
496 self.drop_index(index.id, cascade).await?;
497 }
498 self.catalog
499 .write()
500 .drop_table(database_id, schema_id, table_id);
501 Ok(())
502 }
503
504 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
505 if cascade {
506 return Err(ErrorCode::NotSupported(
507 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
508 "use drop instead".to_owned(),
509 )
510 .into());
511 }
512 let (database_id, schema_id) = self.drop_table_or_source_id(source_id);
513 self.catalog
514 .write()
515 .drop_source(database_id, schema_id, source_id);
516 Ok(())
517 }
518
519 async fn drop_sink(
520 &self,
521 sink_id: u32,
522 cascade: bool,
523 _target_table_change: Option<ReplaceJobPlan>,
524 ) -> Result<()> {
525 if cascade {
526 return Err(ErrorCode::NotSupported(
527 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
528 "use drop instead".to_owned(),
529 )
530 .into());
531 }
532 let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id);
533 self.catalog
534 .write()
535 .drop_sink(database_id, schema_id, sink_id);
536 Ok(())
537 }
538
539 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
540 if cascade {
541 return Err(ErrorCode::NotSupported(
542 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
543 "use drop instead".to_owned(),
544 )
545 .into());
546 }
547 let (database_id, schema_id) = self.drop_table_or_subscription_id(subscription_id);
548 self.catalog
549 .write()
550 .drop_subscription(database_id, schema_id, subscription_id);
551 Ok(())
552 }
553
554 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
555 if cascade {
556 return Err(ErrorCode::NotSupported(
557 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
558 "use drop instead".to_owned(),
559 )
560 .into());
561 }
562 let &schema_id = self
563 .table_id_to_schema_id
564 .read()
565 .get(&index_id.index_id)
566 .unwrap();
567 let database_id = self.get_database_id_by_schema(schema_id);
568
569 let index = {
570 let catalog_reader = self.catalog.read();
571 let schema_catalog = catalog_reader
572 .get_schema_by_id(&database_id, &schema_id)
573 .unwrap();
574 schema_catalog.get_index_by_id(&index_id).unwrap().clone()
575 };
576
577 let index_table_id = index.index_table().id;
578 let (database_id, schema_id) = self.drop_table_or_index_id(index_id.index_id);
579 self.catalog
580 .write()
581 .drop_index(database_id, schema_id, index_id);
582 self.catalog
583 .write()
584 .drop_table(database_id, schema_id, index_table_id);
585 Ok(())
586 }
587
588 async fn drop_function(&self, _function_id: FunctionId) -> Result<()> {
589 unreachable!()
590 }
591
592 async fn drop_connection(&self, _connection_id: u32, _cascade: bool) -> Result<()> {
593 unreachable!()
594 }
595
596 async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
597 unreachable!()
598 }
599
600 async fn drop_database(&self, database_id: u32) -> Result<()> {
601 self.catalog.write().drop_database(database_id);
602 Ok(())
603 }
604
605 async fn drop_schema(&self, schema_id: u32, _cascade: bool) -> Result<()> {
606 let database_id = self.drop_schema_id(schema_id);
607 self.catalog.write().drop_schema(database_id, schema_id);
608 Ok(())
609 }
610
611 async fn alter_name(
612 &self,
613 object_id: alter_name_request::Object,
614 object_name: &str,
615 ) -> Result<()> {
616 match object_id {
617 alter_name_request::Object::TableId(table_id) => {
618 self.catalog
619 .write()
620 .alter_table_name_by_id(&table_id.into(), object_name);
621 Ok(())
622 }
623 _ => {
624 unimplemented!()
625 }
626 }
627 }
628
629 async fn alter_source(&self, source: PbSource) -> Result<()> {
630 self.catalog.write().update_source(&source);
631 Ok(())
632 }
633
634 async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
635 for database in self.catalog.read().iter_databases() {
636 for schema in database.iter_schemas() {
637 match object {
638 Object::TableId(table_id) => {
639 if let Some(table) =
640 schema.get_created_table_by_id(&TableId::from(table_id))
641 {
642 let mut pb_table = table.to_prost();
643 pb_table.owner = owner_id;
644 self.catalog.write().update_table(&pb_table);
645 return Ok(());
646 }
647 }
648 _ => unreachable!(),
649 }
650 }
651 }
652
653 Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
654 }
655
656 async fn alter_set_schema(
657 &self,
658 object: alter_set_schema_request::Object,
659 new_schema_id: u32,
660 ) -> Result<()> {
661 match object {
662 alter_set_schema_request::Object::TableId(table_id) => {
663 let mut pb_table = {
664 let reader = self.catalog.read();
665 let table = reader.get_any_table_by_id(&table_id.into())?.to_owned();
666 table.to_prost()
667 };
668 pb_table.schema_id = new_schema_id;
669 self.catalog.write().update_table(&pb_table);
670 self.table_id_to_schema_id
671 .write()
672 .insert(table_id, new_schema_id);
673 Ok(())
674 }
675 _ => unreachable!(),
676 }
677 }
678
679 async fn alter_parallelism(
680 &self,
681 _table_id: u32,
682 _parallelism: PbTableParallelism,
683 _deferred: bool,
684 ) -> Result<()> {
685 todo!()
686 }
687
688 async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
689 todo!()
690 }
691
692 async fn alter_secret(
693 &self,
694 _secret_id: u32,
695 _secret_name: String,
696 _database_id: u32,
697 _schema_id: u32,
698 _owner_id: u32,
699 _payload: Vec<u8>,
700 ) -> Result<()> {
701 unreachable!()
702 }
703
704 async fn alter_resource_group(
705 &self,
706 _table_id: u32,
707 _resource_group: Option<String>,
708 _deferred: bool,
709 ) -> Result<()> {
710 todo!()
711 }
712
713 async fn alter_database_param(
714 &self,
715 database_id: u32,
716 param: AlterDatabaseParam,
717 ) -> Result<()> {
718 let mut pb_database = {
719 let reader = self.catalog.read();
720 let database = reader.get_database_by_id(&database_id)?.to_owned();
721 database.to_prost()
722 };
723 match param {
724 AlterDatabaseParam::BarrierIntervalMs(interval) => {
725 pb_database.barrier_interval_ms = interval;
726 }
727 AlterDatabaseParam::CheckpointFrequency(frequency) => {
728 pb_database.checkpoint_frequency = frequency;
729 }
730 }
731 self.catalog.write().update_database(&pb_database);
732 Ok(())
733 }
734}
735
736impl MockCatalogWriter {
737 pub fn new(
738 catalog: Arc<RwLock<Catalog>>,
739 hummock_snapshot_manager: HummockSnapshotManagerRef,
740 ) -> Self {
741 catalog.write().create_database(&PbDatabase {
742 id: 0,
743 name: DEFAULT_DATABASE_NAME.to_owned(),
744 owner: DEFAULT_SUPER_USER_ID,
745 resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
746 barrier_interval_ms: None,
747 checkpoint_frequency: None,
748 });
749 catalog.write().create_schema(&PbSchema {
750 id: 1,
751 name: DEFAULT_SCHEMA_NAME.to_owned(),
752 database_id: 0,
753 owner: DEFAULT_SUPER_USER_ID,
754 });
755 catalog.write().create_schema(&PbSchema {
756 id: 2,
757 name: PG_CATALOG_SCHEMA_NAME.to_owned(),
758 database_id: 0,
759 owner: DEFAULT_SUPER_USER_ID,
760 });
761 catalog.write().create_schema(&PbSchema {
762 id: 3,
763 name: RW_CATALOG_SCHEMA_NAME.to_owned(),
764 database_id: 0,
765 owner: DEFAULT_SUPER_USER_ID,
766 });
767 let mut map: HashMap<u32, DatabaseId> = HashMap::new();
768 map.insert(1_u32, 0_u32);
769 map.insert(2_u32, 0_u32);
770 map.insert(3_u32, 0_u32);
771 Self {
772 catalog,
773 id: AtomicU32::new(3),
774 table_id_to_schema_id: Default::default(),
775 schema_id_to_database_id: RwLock::new(map),
776 hummock_snapshot_manager,
777 }
778 }
779
780 fn gen_id(&self) -> u32 {
781 self.id.fetch_add(1, Ordering::SeqCst) + 1
783 }
784
785 fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
786 self.table_id_to_schema_id
787 .write()
788 .insert(table_id, schema_id);
789 }
790
791 fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
792 let schema_id = self
793 .table_id_to_schema_id
794 .write()
795 .remove(&table_id)
796 .unwrap();
797 (self.get_database_id_by_schema(schema_id), schema_id)
798 }
799
800 fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
801 self.table_id_to_schema_id
802 .write()
803 .insert(table_id, schema_id);
804 }
805
806 fn add_table_or_subscription_id(
807 &self,
808 table_id: u32,
809 schema_id: SchemaId,
810 _database_id: DatabaseId,
811 ) {
812 self.table_id_to_schema_id
813 .write()
814 .insert(table_id, schema_id);
815 }
816
817 fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
818 self.table_id_to_schema_id
819 .write()
820 .insert(table_id, schema_id);
821 }
822
823 fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
824 let schema_id = self
825 .table_id_to_schema_id
826 .write()
827 .remove(&table_id)
828 .unwrap();
829 (self.get_database_id_by_schema(schema_id), schema_id)
830 }
831
832 fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
833 let schema_id = self
834 .table_id_to_schema_id
835 .write()
836 .remove(&table_id)
837 .unwrap();
838 (self.get_database_id_by_schema(schema_id), schema_id)
839 }
840
841 fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
842 let schema_id = self
843 .table_id_to_schema_id
844 .write()
845 .remove(&table_id)
846 .unwrap();
847 (self.get_database_id_by_schema(schema_id), schema_id)
848 }
849
850 fn add_schema_id(&self, schema_id: u32, database_id: DatabaseId) {
851 self.schema_id_to_database_id
852 .write()
853 .insert(schema_id, database_id);
854 }
855
856 fn drop_schema_id(&self, schema_id: u32) -> DatabaseId {
857 self.schema_id_to_database_id
858 .write()
859 .remove(&schema_id)
860 .unwrap()
861 }
862
863 fn create_source_inner(&self, mut source: PbSource) -> Result<u32> {
864 source.id = self.gen_id();
865 self.catalog.write().create_source(&source);
866 self.add_table_or_source_id(source.id, source.schema_id, source.database_id);
867 Ok(source.id)
868 }
869
870 fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
871 sink.id = self.gen_id();
872 sink.stream_job_status = PbStreamJobStatus::Created as _;
873 self.catalog.write().create_sink(&sink);
874 self.add_table_or_sink_id(sink.id, sink.schema_id, sink.database_id);
875 Ok(())
876 }
877
878 fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
879 subscription.id = self.gen_id();
880 self.catalog.write().create_subscription(&subscription);
881 self.add_table_or_subscription_id(
882 subscription.id,
883 subscription.schema_id,
884 subscription.database_id,
885 );
886 Ok(())
887 }
888
889 fn get_database_id_by_schema(&self, schema_id: u32) -> DatabaseId {
890 *self
891 .schema_id_to_database_id
892 .read()
893 .get(&schema_id)
894 .unwrap()
895 }
896}
897
898pub struct MockUserInfoWriter {
899 id: AtomicU32,
900 user_info: Arc<RwLock<UserInfoManager>>,
901}
902
903#[async_trait::async_trait]
904impl UserInfoWriter for MockUserInfoWriter {
905 async fn create_user(&self, user: UserInfo) -> Result<()> {
906 let mut user = user;
907 user.id = self.gen_id();
908 self.user_info.write().create_user(user);
909 Ok(())
910 }
911
912 async fn drop_user(&self, id: UserId) -> Result<()> {
913 self.user_info.write().drop_user(id);
914 Ok(())
915 }
916
917 async fn update_user(
918 &self,
919 update_user: UserInfo,
920 update_fields: Vec<UpdateField>,
921 ) -> Result<()> {
922 let mut lock = self.user_info.write();
923 let id = update_user.get_id();
924 let Some(old_name) = lock.get_user_name_by_id(id) else {
925 return Ok(());
926 };
927 let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
928 update_fields.into_iter().for_each(|field| match field {
929 UpdateField::Super => user_info.is_super = update_user.is_super,
930 UpdateField::Login => user_info.can_login = update_user.can_login,
931 UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
932 UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
933 UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
934 UpdateField::Rename => user_info.name.clone_from(&update_user.name),
935 UpdateField::Unspecified => unreachable!(),
936 });
937 lock.update_user(update_user);
938 Ok(())
939 }
940
941 async fn grant_privilege(
944 &self,
945 users: Vec<UserId>,
946 privileges: Vec<GrantPrivilege>,
947 with_grant_option: bool,
948 _grantor: UserId,
949 ) -> Result<()> {
950 let privileges = privileges
951 .into_iter()
952 .map(|mut p| {
953 p.action_with_opts
954 .iter_mut()
955 .for_each(|ao| ao.with_grant_option = with_grant_option);
956 p
957 })
958 .collect::<Vec<_>>();
959 for user_id in users {
960 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
961 u.extend_privileges(privileges.clone());
962 }
963 }
964 Ok(())
965 }
966
967 async fn revoke_privilege(
970 &self,
971 users: Vec<UserId>,
972 privileges: Vec<GrantPrivilege>,
973 _granted_by: UserId,
974 _revoke_by: UserId,
975 revoke_grant_option: bool,
976 _cascade: bool,
977 ) -> Result<()> {
978 for user_id in users {
979 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
980 u.revoke_privileges(privileges.clone(), revoke_grant_option);
981 }
982 }
983 Ok(())
984 }
985
986 async fn alter_default_privilege(
987 &self,
988 _users: Vec<UserId>,
989 _database_id: DatabaseId,
990 _schemas: Vec<SchemaId>,
991 _operation: AlterDefaultPrivilegeOperation,
992 _operated_by: UserId,
993 ) -> Result<()> {
994 todo!()
995 }
996}
997
998impl MockUserInfoWriter {
999 pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1000 user_info.write().create_user(UserInfo {
1001 id: DEFAULT_SUPER_USER_ID,
1002 name: DEFAULT_SUPER_USER.to_owned(),
1003 is_super: true,
1004 can_create_db: true,
1005 can_create_user: true,
1006 can_login: true,
1007 ..Default::default()
1008 });
1009 Self {
1010 user_info,
1011 id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
1012 }
1013 }
1014
1015 fn gen_id(&self) -> u32 {
1016 self.id.fetch_add(1, Ordering::SeqCst)
1017 }
1018}
1019
1020pub struct MockFrontendMetaClient {}
1021
1022#[async_trait::async_trait]
1023impl FrontendMetaClient for MockFrontendMetaClient {
1024 async fn try_unregister(&self) {}
1025
1026 async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1027 Ok(INVALID_VERSION_ID)
1028 }
1029
1030 async fn wait(&self) -> RpcResult<()> {
1031 Ok(())
1032 }
1033
1034 async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1035 Ok(vec![])
1036 }
1037
1038 async fn list_table_fragments(
1039 &self,
1040 _table_ids: &[u32],
1041 ) -> RpcResult<HashMap<u32, TableFragmentInfo>> {
1042 Ok(HashMap::default())
1043 }
1044
1045 async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1046 Ok(vec![])
1047 }
1048
1049 async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1050 Ok(vec![])
1051 }
1052
1053 async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1054 Ok(vec![])
1055 }
1056
1057 async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1058 Ok(vec![])
1059 }
1060
1061 async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1062 Ok(vec![])
1063 }
1064
1065 async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1066 Ok(vec![])
1067 }
1068
1069 async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1070 Ok(vec![])
1071 }
1072
1073 async fn set_system_param(
1074 &self,
1075 _param: String,
1076 _value: Option<String>,
1077 ) -> RpcResult<Option<SystemParamsReader>> {
1078 Ok(Some(SystemParams::default().into()))
1079 }
1080
1081 async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1082 Ok(Default::default())
1083 }
1084
1085 async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1086 Ok("".to_owned())
1087 }
1088
1089 async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1090 Ok(vec![])
1091 }
1092
1093 async fn get_tables(
1094 &self,
1095 _table_ids: &[u32],
1096 _include_dropped_tables: bool,
1097 ) -> RpcResult<HashMap<u32, Table>> {
1098 Ok(HashMap::new())
1099 }
1100
1101 async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(u32, u64)>> {
1102 unimplemented!()
1103 }
1104
1105 async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1106 Ok(HummockVersion::default())
1107 }
1108
1109 async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1110 unimplemented!()
1111 }
1112
1113 async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1114 unimplemented!()
1115 }
1116
1117 async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1118 unimplemented!()
1119 }
1120
1121 async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1122 unimplemented!()
1123 }
1124
1125 async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1126 unimplemented!()
1127 }
1128
1129 async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1130 unimplemented!()
1131 }
1132
1133 async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1134 unimplemented!()
1135 }
1136
1137 async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1138 unimplemented!()
1139 }
1140
1141 async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1142 Ok(vec![])
1143 }
1144
1145 async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1146 unimplemented!()
1147 }
1148
1149 async fn recover(&self) -> RpcResult<()> {
1150 unimplemented!()
1151 }
1152
1153 async fn apply_throttle(
1154 &self,
1155 _kind: PbThrottleTarget,
1156 _id: u32,
1157 _rate_limit: Option<u32>,
1158 ) -> RpcResult<()> {
1159 unimplemented!()
1160 }
1161
1162 async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1163 Ok(RecoveryStatus::StatusRunning)
1164 }
1165
1166 async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1167 Ok(vec![])
1168 }
1169
1170 async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1171 Ok(vec![])
1172 }
1173
1174 async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1175 unimplemented!()
1176 }
1177
1178 async fn alter_sink_props(
1179 &self,
1180 _sink_id: u32,
1181 _changed_props: BTreeMap<String, String>,
1182 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1183 _connector_conn_ref: Option<u32>,
1184 ) -> RpcResult<()> {
1185 unimplemented!()
1186 }
1187
1188 async fn alter_source_connector_props(
1189 &self,
1190 _source_id: u32,
1191 _changed_props: BTreeMap<String, String>,
1192 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1193 _connector_conn_ref: Option<u32>,
1194 ) -> RpcResult<()> {
1195 unimplemented!()
1196 }
1197
1198 async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1199 unimplemented!()
1200 }
1201
1202 async fn get_fragment_by_id(
1203 &self,
1204 _fragment_id: u32,
1205 ) -> RpcResult<Option<FragmentDistribution>> {
1206 unimplemented!()
1207 }
1208
1209 fn worker_id(&self) -> u32 {
1210 0
1211 }
1212
1213 async fn set_sync_log_store_aligned(&self, _job_id: u32, _aligned: bool) -> RpcResult<()> {
1214 Ok(())
1215 }
1216
1217 async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1218 Ok(1)
1219 }
1220
1221 async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1222 Ok(RefreshResponse { status: None })
1223 }
1224}
1225
1226#[cfg(test)]
1227pub static PROTO_FILE_DATA: &str = r#"
1228 syntax = "proto3";
1229 package test;
1230 message TestRecord {
1231 int32 id = 1;
1232 Country country = 3;
1233 int64 zipcode = 4;
1234 float rate = 5;
1235 }
1236 message TestRecordAlterType {
1237 string id = 1;
1238 Country country = 3;
1239 int32 zipcode = 4;
1240 float rate = 5;
1241 }
1242 message TestRecordExt {
1243 int32 id = 1;
1244 Country country = 3;
1245 int64 zipcode = 4;
1246 float rate = 5;
1247 string name = 6;
1248 }
1249 message Country {
1250 string address = 1;
1251 City city = 2;
1252 string zipcode = 3;
1253 }
1254 message City {
1255 string address = 1;
1256 string zipcode = 2;
1257 }"#;
1258
1259pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1262 let in_file = Builder::new()
1263 .prefix("temp")
1264 .suffix(".proto")
1265 .rand_bytes(8)
1266 .tempfile()
1267 .unwrap();
1268
1269 let out_file = Builder::new()
1270 .prefix("temp")
1271 .suffix(".pb")
1272 .rand_bytes(8)
1273 .tempfile()
1274 .unwrap();
1275
1276 let mut file = in_file.as_file();
1277 file.write_all(proto_data.as_ref())
1278 .expect("writing binary to test file");
1279 file.flush().expect("flush temp file failed");
1280 let include_path = in_file
1281 .path()
1282 .parent()
1283 .unwrap()
1284 .to_string_lossy()
1285 .into_owned();
1286 let out_path = out_file.path().to_string_lossy().into_owned();
1287 let in_path = in_file.path().to_string_lossy().into_owned();
1288 let mut compile = std::process::Command::new("protoc");
1289
1290 let out = compile
1291 .arg("--include_imports")
1292 .arg("-I")
1293 .arg(include_path)
1294 .arg(format!("--descriptor_set_out={}", out_path))
1295 .arg(in_path)
1296 .output()
1297 .expect("failed to compile proto");
1298 if !out.status.success() {
1299 panic!("compile proto failed \n output: {:?}", out);
1300 }
1301 out_file
1302}