1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
29 FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
30 RW_CATALOG_SCHEMA_NAME, TableId,
31};
32use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
33use risingwave_common::session_config::SessionConfig;
34use risingwave_common::system_param::reader::SystemParamsReader;
35use risingwave_common::util::cluster_limit::ClusterLimit;
36use risingwave_common::util::column_index_mapping::ColIndexMapping;
37use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
38use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
39use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
40use risingwave_pb::backup_service::MetaSnapshotMetadata;
41use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
42use risingwave_pb::catalog::{
43 PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44 PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::{
49 DdlProgress, PbTableJobType, ReplaceJobPlan, TableJobType, alter_name_request,
50 alter_set_schema_request, alter_swap_rename_request, create_connection_request,
51};
52use risingwave_pb::hummock::write_limits::WriteLimit;
53use risingwave_pb::hummock::{
54 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
55};
56use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
57use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
58use risingwave_pb::meta::list_actor_states_response::ActorState;
59use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
60use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
61use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
62use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
63use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
64use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
65use risingwave_pb::meta::{
66 EventLog, PbTableParallelism, PbThrottleTarget, RecoveryStatus, SystemParams,
67};
68use risingwave_pb::secret::PbSecretRef;
69use risingwave_pb::stream_plan::StreamFragmentGraph;
70use risingwave_pb::user::update_user_request::UpdateField;
71use risingwave_pb::user::{GrantPrivilege, UserInfo};
72use risingwave_rpc_client::error::Result as RpcResult;
73use tempfile::{Builder, NamedTempFile};
74
75use crate::FrontendOpts;
76use crate::catalog::catalog_service::CatalogWriter;
77use crate::catalog::root_catalog::Catalog;
78use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId};
79use crate::error::{ErrorCode, Result};
80use crate::handler::RwPgResponse;
81use crate::meta_client::FrontendMetaClient;
82use crate::scheduler::HummockSnapshotManagerRef;
83use crate::session::{AuthContext, FrontendEnv, SessionImpl};
84use crate::user::UserId;
85use crate::user::user_manager::UserInfoManager;
86use crate::user::user_service::UserInfoWriter;
87
88pub struct LocalFrontend {
90 pub opts: FrontendOpts,
91 env: FrontendEnv,
92}
93
94impl SessionManager for LocalFrontend {
95 type Session = SessionImpl;
96
97 fn create_dummy_session(
98 &self,
99 _database_id: u32,
100 _user_name: u32,
101 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
102 unreachable!()
103 }
104
105 fn connect(
106 &self,
107 _database: &str,
108 _user_name: &str,
109 _peer_addr: AddressRef,
110 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
111 Ok(self.session_ref())
112 }
113
114 fn cancel_queries_in_session(&self, _session_id: SessionId) {
115 unreachable!()
116 }
117
118 fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
119 unreachable!()
120 }
121
122 fn end_session(&self, _session: &Self::Session) {
123 unreachable!()
124 }
125}
126
127impl LocalFrontend {
128 #[expect(clippy::unused_async)]
129 pub async fn new(opts: FrontendOpts) -> Self {
130 let env = FrontendEnv::mock();
131 Self { opts, env }
132 }
133
134 pub async fn run_sql(
135 &self,
136 sql: impl Into<String>,
137 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
138 let sql: Arc<str> = Arc::from(sql.into());
139 self.session_ref().run_statement(sql, vec![]).await
140 }
141
142 pub async fn run_sql_with_session(
143 &self,
144 session_ref: Arc<SessionImpl>,
145 sql: impl Into<String>,
146 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
147 let sql: Arc<str> = Arc::from(sql.into());
148 session_ref.run_statement(sql, vec![]).await
149 }
150
151 pub async fn run_user_sql(
152 &self,
153 sql: impl Into<String>,
154 database: String,
155 user_name: String,
156 user_id: UserId,
157 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
158 let sql: Arc<str> = Arc::from(sql.into());
159 self.session_user_ref(database, user_name, user_id)
160 .run_statement(sql, vec![])
161 .await
162 }
163
164 pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
165 let mut rsp = self.run_sql(sql).await.unwrap();
166 let mut res = vec![];
167 #[for_await]
168 for row_set in rsp.values_stream() {
169 for row in row_set.unwrap() {
170 res.push(format!("{:?}", row));
171 }
172 }
173 res
174 }
175
176 pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
177 let mut rsp = self.run_sql(sql).await.unwrap();
178 assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
179 let mut res = String::new();
180 #[for_await]
181 for row_set in rsp.values_stream() {
182 for row in row_set.unwrap() {
183 let row: Row = row;
184 let row = row.values()[0].as_ref().unwrap();
185 res += std::str::from_utf8(row).unwrap();
186 res += "\n";
187 }
188 }
189 res
190 }
191
192 pub fn session_ref(&self) -> Arc<SessionImpl> {
194 self.session_user_ref(
195 DEFAULT_DATABASE_NAME.to_owned(),
196 DEFAULT_SUPER_USER.to_owned(),
197 DEFAULT_SUPER_USER_ID,
198 )
199 }
200
201 pub fn session_user_ref(
202 &self,
203 database: String,
204 user_name: String,
205 user_id: UserId,
206 ) -> Arc<SessionImpl> {
207 Arc::new(SessionImpl::new(
208 self.env.clone(),
209 AuthContext::new(database, user_name, user_id),
210 UserAuthenticator::None,
211 (0, 0),
213 Address::Tcp(SocketAddr::new(
214 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
215 6666,
216 ))
217 .into(),
218 Default::default(),
219 ))
220 }
221}
222
223pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
224 if rsp.stmt_type() != StatementType::EXPLAIN {
225 panic!("RESPONSE INVALID: {rsp:?}");
226 }
227 let mut res = String::new();
228 #[for_await]
229 for row_set in rsp.values_stream() {
230 for row in row_set.unwrap() {
231 let row: Row = row;
232 let row = row.values()[0].as_ref().unwrap();
233 res += std::str::from_utf8(row).unwrap();
234 res += "\n";
235 }
236 }
237 res
238}
239
240pub struct MockCatalogWriter {
241 catalog: Arc<RwLock<Catalog>>,
242 id: AtomicU32,
243 table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
244 schema_id_to_database_id: RwLock<HashMap<u32, DatabaseId>>,
245 hummock_snapshot_manager: HummockSnapshotManagerRef,
246}
247
248#[async_trait::async_trait]
249impl CatalogWriter for MockCatalogWriter {
250 async fn create_database(
251 &self,
252 db_name: &str,
253 owner: UserId,
254 resource_group: &str,
255 ) -> Result<()> {
256 let database_id = self.gen_id();
257 self.catalog.write().create_database(&PbDatabase {
258 name: db_name.to_owned(),
259 id: database_id,
260 owner,
261 resource_group: resource_group.to_owned(),
262 });
263 self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
264 .await?;
265 self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
266 .await?;
267 self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
268 .await?;
269 Ok(())
270 }
271
272 async fn create_schema(
273 &self,
274 db_id: DatabaseId,
275 schema_name: &str,
276 owner: UserId,
277 ) -> Result<()> {
278 let id = self.gen_id();
279 self.catalog.write().create_schema(&PbSchema {
280 id,
281 name: schema_name.to_owned(),
282 database_id: db_id,
283 owner,
284 });
285 self.add_schema_id(id, db_id);
286 Ok(())
287 }
288
289 async fn create_materialized_view(
290 &self,
291 mut table: PbTable,
292 _graph: StreamFragmentGraph,
293 _dependencies: HashSet<ObjectId>,
294 _specific_resource_group: Option<String>,
295 ) -> Result<()> {
296 table.id = self.gen_id();
297 table.stream_job_status = PbStreamJobStatus::Created as _;
298 table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
299 self.catalog.write().create_table(&table);
300 self.add_table_or_source_id(table.id, table.schema_id, table.database_id);
301 self.hummock_snapshot_manager
302 .add_table_for_test(TableId::new(table.id));
303 Ok(())
304 }
305
306 async fn create_view(&self, mut view: PbView) -> Result<()> {
307 view.id = self.gen_id();
308 self.catalog.write().create_view(&view);
309 self.add_table_or_source_id(view.id, view.schema_id, view.database_id);
310 Ok(())
311 }
312
313 async fn create_table(
314 &self,
315 source: Option<PbSource>,
316 mut table: PbTable,
317 graph: StreamFragmentGraph,
318 _job_type: PbTableJobType,
319 ) -> Result<()> {
320 if let Some(source) = source {
321 let source_id = self.create_source_inner(source)?;
322 table.optional_associated_source_id =
323 Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
324 }
325 self.create_materialized_view(table, graph, HashSet::new(), None)
326 .await?;
327 Ok(())
328 }
329
330 async fn replace_table(
331 &self,
332 _source: Option<PbSource>,
333 mut table: PbTable,
334 _graph: StreamFragmentGraph,
335 _mapping: ColIndexMapping,
336 _job_type: TableJobType,
337 ) -> Result<()> {
338 table.stream_job_status = PbStreamJobStatus::Created as _;
339 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
340 self.catalog.write().update_table(&table);
341 Ok(())
342 }
343
344 async fn replace_source(
345 &self,
346 source: PbSource,
347 _graph: StreamFragmentGraph,
348 _mapping: ColIndexMapping,
349 ) -> Result<()> {
350 self.catalog.write().update_source(&source);
351 Ok(())
352 }
353
354 async fn create_source(
355 &self,
356 source: PbSource,
357 _graph: Option<StreamFragmentGraph>,
358 ) -> Result<()> {
359 self.create_source_inner(source).map(|_| ())
360 }
361
362 async fn create_sink(
363 &self,
364 sink: PbSink,
365 graph: StreamFragmentGraph,
366 _affected_table_change: Option<ReplaceJobPlan>,
367 _dependencies: HashSet<ObjectId>,
368 ) -> Result<()> {
369 self.create_sink_inner(sink, graph)
370 }
371
372 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
373 self.create_subscription_inner(subscription)
374 }
375
376 async fn create_index(
377 &self,
378 mut index: PbIndex,
379 mut index_table: PbTable,
380 _graph: StreamFragmentGraph,
381 ) -> Result<()> {
382 index_table.id = self.gen_id();
383 index_table.stream_job_status = PbStreamJobStatus::Created as _;
384 index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
385 self.catalog.write().create_table(&index_table);
386 self.add_table_or_index_id(
387 index_table.id,
388 index_table.schema_id,
389 index_table.database_id,
390 );
391
392 index.id = index_table.id;
393 index.index_table_id = index_table.id;
394 self.catalog.write().create_index(&index);
395 Ok(())
396 }
397
398 async fn create_function(&self, _function: PbFunction) -> Result<()> {
399 unreachable!()
400 }
401
402 async fn create_connection(
403 &self,
404 _connection_name: String,
405 _database_id: u32,
406 _schema_id: u32,
407 _owner_id: u32,
408 _connection: create_connection_request::Payload,
409 ) -> Result<()> {
410 unreachable!()
411 }
412
413 async fn create_secret(
414 &self,
415 _secret_name: String,
416 _database_id: u32,
417 _schema_id: u32,
418 _owner_id: u32,
419 _payload: Vec<u8>,
420 ) -> Result<()> {
421 unreachable!()
422 }
423
424 async fn comment_on(&self, _comment: PbComment) -> Result<()> {
425 unreachable!()
426 }
427
428 async fn drop_table(
429 &self,
430 source_id: Option<u32>,
431 table_id: TableId,
432 cascade: bool,
433 ) -> Result<()> {
434 if cascade {
435 return Err(ErrorCode::NotSupported(
436 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
437 "use drop instead".to_owned(),
438 )
439 .into());
440 }
441 if let Some(source_id) = source_id {
442 self.drop_table_or_source_id(source_id);
443 }
444 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
445 let indexes =
446 self.catalog
447 .read()
448 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
449 for index in indexes {
450 self.drop_index(index.id, cascade).await?;
451 }
452 self.catalog
453 .write()
454 .drop_table(database_id, schema_id, table_id);
455 if let Some(source_id) = source_id {
456 self.catalog
457 .write()
458 .drop_source(database_id, schema_id, source_id);
459 }
460 Ok(())
461 }
462
463 async fn drop_view(&self, _view_id: u32, _cascade: bool) -> Result<()> {
464 unreachable!()
465 }
466
467 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
468 if cascade {
469 return Err(ErrorCode::NotSupported(
470 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
471 "use drop instead".to_owned(),
472 )
473 .into());
474 }
475 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
476 let indexes =
477 self.catalog
478 .read()
479 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
480 for index in indexes {
481 self.drop_index(index.id, cascade).await?;
482 }
483 self.catalog
484 .write()
485 .drop_table(database_id, schema_id, table_id);
486 Ok(())
487 }
488
489 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
490 if cascade {
491 return Err(ErrorCode::NotSupported(
492 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
493 "use drop instead".to_owned(),
494 )
495 .into());
496 }
497 let (database_id, schema_id) = self.drop_table_or_source_id(source_id);
498 self.catalog
499 .write()
500 .drop_source(database_id, schema_id, source_id);
501 Ok(())
502 }
503
504 async fn drop_sink(
505 &self,
506 sink_id: u32,
507 cascade: bool,
508 _target_table_change: Option<ReplaceJobPlan>,
509 ) -> Result<()> {
510 if cascade {
511 return Err(ErrorCode::NotSupported(
512 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
513 "use drop instead".to_owned(),
514 )
515 .into());
516 }
517 let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id);
518 self.catalog
519 .write()
520 .drop_sink(database_id, schema_id, sink_id);
521 Ok(())
522 }
523
524 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
525 if cascade {
526 return Err(ErrorCode::NotSupported(
527 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
528 "use drop instead".to_owned(),
529 )
530 .into());
531 }
532 let (database_id, schema_id) = self.drop_table_or_subscription_id(subscription_id);
533 self.catalog
534 .write()
535 .drop_subscription(database_id, schema_id, subscription_id);
536 Ok(())
537 }
538
539 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
540 if cascade {
541 return Err(ErrorCode::NotSupported(
542 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
543 "use drop instead".to_owned(),
544 )
545 .into());
546 }
547 let &schema_id = self
548 .table_id_to_schema_id
549 .read()
550 .get(&index_id.index_id)
551 .unwrap();
552 let database_id = self.get_database_id_by_schema(schema_id);
553
554 let index = {
555 let catalog_reader = self.catalog.read();
556 let schema_catalog = catalog_reader
557 .get_schema_by_id(&database_id, &schema_id)
558 .unwrap();
559 schema_catalog.get_index_by_id(&index_id).unwrap().clone()
560 };
561
562 let index_table_id = index.index_table.id;
563 let (database_id, schema_id) = self.drop_table_or_index_id(index_id.index_id);
564 self.catalog
565 .write()
566 .drop_index(database_id, schema_id, index_id);
567 self.catalog
568 .write()
569 .drop_table(database_id, schema_id, index_table_id);
570 Ok(())
571 }
572
573 async fn drop_function(&self, _function_id: FunctionId) -> Result<()> {
574 unreachable!()
575 }
576
577 async fn drop_connection(&self, _connection_id: ConnectionId) -> Result<()> {
578 unreachable!()
579 }
580
581 async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
582 unreachable!()
583 }
584
585 async fn drop_database(&self, database_id: u32) -> Result<()> {
586 self.catalog.write().drop_database(database_id);
587 Ok(())
588 }
589
590 async fn drop_schema(&self, schema_id: u32, _cascade: bool) -> Result<()> {
591 let database_id = self.drop_schema_id(schema_id);
592 self.catalog.write().drop_schema(database_id, schema_id);
593 Ok(())
594 }
595
596 async fn alter_name(
597 &self,
598 object_id: alter_name_request::Object,
599 object_name: &str,
600 ) -> Result<()> {
601 match object_id {
602 alter_name_request::Object::TableId(table_id) => {
603 self.catalog
604 .write()
605 .alter_table_name_by_id(&table_id.into(), object_name);
606 Ok(())
607 }
608 _ => {
609 unimplemented!()
610 }
611 }
612 }
613
614 async fn alter_source(&self, source: PbSource) -> Result<()> {
615 self.catalog.write().update_source(&source);
616 Ok(())
617 }
618
619 async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
620 for database in self.catalog.read().iter_databases() {
621 for schema in database.iter_schemas() {
622 match object {
623 Object::TableId(table_id) => {
624 if let Some(table) =
625 schema.get_created_table_by_id(&TableId::from(table_id))
626 {
627 let mut pb_table = table.to_prost();
628 pb_table.owner = owner_id;
629 self.catalog.write().update_table(&pb_table);
630 return Ok(());
631 }
632 }
633 _ => unreachable!(),
634 }
635 }
636 }
637
638 Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
639 }
640
641 async fn alter_set_schema(
642 &self,
643 object: alter_set_schema_request::Object,
644 new_schema_id: u32,
645 ) -> Result<()> {
646 match object {
647 alter_set_schema_request::Object::TableId(table_id) => {
648 let mut pb_table = {
649 let reader = self.catalog.read();
650 let table = reader.get_any_table_by_id(&table_id.into())?.to_owned();
651 table.to_prost()
652 };
653 pb_table.schema_id = new_schema_id;
654 self.catalog.write().update_table(&pb_table);
655 self.table_id_to_schema_id
656 .write()
657 .insert(table_id, new_schema_id);
658 Ok(())
659 }
660 _ => unreachable!(),
661 }
662 }
663
664 async fn alter_parallelism(
665 &self,
666 _table_id: u32,
667 _parallelism: PbTableParallelism,
668 _deferred: bool,
669 ) -> Result<()> {
670 todo!()
671 }
672
673 async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
674 todo!()
675 }
676
677 async fn alter_secret(
678 &self,
679 _secret_id: u32,
680 _secret_name: String,
681 _database_id: u32,
682 _schema_id: u32,
683 _owner_id: u32,
684 _payload: Vec<u8>,
685 ) -> Result<()> {
686 unreachable!()
687 }
688
689 async fn alter_resource_group(
690 &self,
691 _table_id: u32,
692 _resource_group: Option<String>,
693 _deferred: bool,
694 ) -> Result<()> {
695 todo!()
696 }
697}
698
699impl MockCatalogWriter {
700 pub fn new(
701 catalog: Arc<RwLock<Catalog>>,
702 hummock_snapshot_manager: HummockSnapshotManagerRef,
703 ) -> Self {
704 catalog.write().create_database(&PbDatabase {
705 id: 0,
706 name: DEFAULT_DATABASE_NAME.to_owned(),
707 owner: DEFAULT_SUPER_USER_ID,
708 resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
709 });
710 catalog.write().create_schema(&PbSchema {
711 id: 1,
712 name: DEFAULT_SCHEMA_NAME.to_owned(),
713 database_id: 0,
714 owner: DEFAULT_SUPER_USER_ID,
715 });
716 catalog.write().create_schema(&PbSchema {
717 id: 2,
718 name: PG_CATALOG_SCHEMA_NAME.to_owned(),
719 database_id: 0,
720 owner: DEFAULT_SUPER_USER_ID,
721 });
722 catalog.write().create_schema(&PbSchema {
723 id: 3,
724 name: RW_CATALOG_SCHEMA_NAME.to_owned(),
725 database_id: 0,
726 owner: DEFAULT_SUPER_USER_ID,
727 });
728 let mut map: HashMap<u32, DatabaseId> = HashMap::new();
729 map.insert(1_u32, 0_u32);
730 map.insert(2_u32, 0_u32);
731 map.insert(3_u32, 0_u32);
732 Self {
733 catalog,
734 id: AtomicU32::new(3),
735 table_id_to_schema_id: Default::default(),
736 schema_id_to_database_id: RwLock::new(map),
737 hummock_snapshot_manager,
738 }
739 }
740
741 fn gen_id(&self) -> u32 {
742 self.id.fetch_add(1, Ordering::SeqCst) + 1
744 }
745
746 fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
747 self.table_id_to_schema_id
748 .write()
749 .insert(table_id, schema_id);
750 }
751
752 fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
753 let schema_id = self
754 .table_id_to_schema_id
755 .write()
756 .remove(&table_id)
757 .unwrap();
758 (self.get_database_id_by_schema(schema_id), schema_id)
759 }
760
761 fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
762 self.table_id_to_schema_id
763 .write()
764 .insert(table_id, schema_id);
765 }
766
767 fn add_table_or_subscription_id(
768 &self,
769 table_id: u32,
770 schema_id: SchemaId,
771 _database_id: DatabaseId,
772 ) {
773 self.table_id_to_schema_id
774 .write()
775 .insert(table_id, schema_id);
776 }
777
778 fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
779 self.table_id_to_schema_id
780 .write()
781 .insert(table_id, schema_id);
782 }
783
784 fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
785 let schema_id = self
786 .table_id_to_schema_id
787 .write()
788 .remove(&table_id)
789 .unwrap();
790 (self.get_database_id_by_schema(schema_id), schema_id)
791 }
792
793 fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
794 let schema_id = self
795 .table_id_to_schema_id
796 .write()
797 .remove(&table_id)
798 .unwrap();
799 (self.get_database_id_by_schema(schema_id), schema_id)
800 }
801
802 fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
803 let schema_id = self
804 .table_id_to_schema_id
805 .write()
806 .remove(&table_id)
807 .unwrap();
808 (self.get_database_id_by_schema(schema_id), schema_id)
809 }
810
811 fn add_schema_id(&self, schema_id: u32, database_id: DatabaseId) {
812 self.schema_id_to_database_id
813 .write()
814 .insert(schema_id, database_id);
815 }
816
817 fn drop_schema_id(&self, schema_id: u32) -> DatabaseId {
818 self.schema_id_to_database_id
819 .write()
820 .remove(&schema_id)
821 .unwrap()
822 }
823
824 fn create_source_inner(&self, mut source: PbSource) -> Result<u32> {
825 source.id = self.gen_id();
826 self.catalog.write().create_source(&source);
827 self.add_table_or_source_id(source.id, source.schema_id, source.database_id);
828 Ok(source.id)
829 }
830
831 fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
832 sink.id = self.gen_id();
833 self.catalog.write().create_sink(&sink);
834 self.add_table_or_sink_id(sink.id, sink.schema_id, sink.database_id);
835 Ok(())
836 }
837
838 fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
839 subscription.id = self.gen_id();
840 self.catalog.write().create_subscription(&subscription);
841 self.add_table_or_subscription_id(
842 subscription.id,
843 subscription.schema_id,
844 subscription.database_id,
845 );
846 Ok(())
847 }
848
849 fn get_database_id_by_schema(&self, schema_id: u32) -> DatabaseId {
850 *self
851 .schema_id_to_database_id
852 .read()
853 .get(&schema_id)
854 .unwrap()
855 }
856}
857
858pub struct MockUserInfoWriter {
859 id: AtomicU32,
860 user_info: Arc<RwLock<UserInfoManager>>,
861}
862
863#[async_trait::async_trait]
864impl UserInfoWriter for MockUserInfoWriter {
865 async fn create_user(&self, user: UserInfo) -> Result<()> {
866 let mut user = user;
867 user.id = self.gen_id();
868 self.user_info.write().create_user(user);
869 Ok(())
870 }
871
872 async fn drop_user(&self, id: UserId) -> Result<()> {
873 self.user_info.write().drop_user(id);
874 Ok(())
875 }
876
877 async fn update_user(
878 &self,
879 update_user: UserInfo,
880 update_fields: Vec<UpdateField>,
881 ) -> Result<()> {
882 let mut lock = self.user_info.write();
883 let id = update_user.get_id();
884 let Some(old_name) = lock.get_user_name_by_id(id) else {
885 return Ok(());
886 };
887 let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
888 update_fields.into_iter().for_each(|field| match field {
889 UpdateField::Super => user_info.is_super = update_user.is_super,
890 UpdateField::Login => user_info.can_login = update_user.can_login,
891 UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
892 UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
893 UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
894 UpdateField::Rename => user_info.name.clone_from(&update_user.name),
895 UpdateField::Unspecified => unreachable!(),
896 });
897 lock.update_user(update_user);
898 Ok(())
899 }
900
901 async fn grant_privilege(
904 &self,
905 users: Vec<UserId>,
906 privileges: Vec<GrantPrivilege>,
907 with_grant_option: bool,
908 _grantor: UserId,
909 ) -> Result<()> {
910 let privileges = privileges
911 .into_iter()
912 .map(|mut p| {
913 p.action_with_opts
914 .iter_mut()
915 .for_each(|ao| ao.with_grant_option = with_grant_option);
916 p
917 })
918 .collect::<Vec<_>>();
919 for user_id in users {
920 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
921 u.extend_privileges(privileges.clone());
922 }
923 }
924 Ok(())
925 }
926
927 async fn revoke_privilege(
930 &self,
931 users: Vec<UserId>,
932 privileges: Vec<GrantPrivilege>,
933 _granted_by: UserId,
934 _revoke_by: UserId,
935 revoke_grant_option: bool,
936 _cascade: bool,
937 ) -> Result<()> {
938 for user_id in users {
939 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
940 u.revoke_privileges(privileges.clone(), revoke_grant_option);
941 }
942 }
943 Ok(())
944 }
945}
946
947impl MockUserInfoWriter {
948 pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
949 user_info.write().create_user(UserInfo {
950 id: DEFAULT_SUPER_USER_ID,
951 name: DEFAULT_SUPER_USER.to_owned(),
952 is_super: true,
953 can_create_db: true,
954 can_create_user: true,
955 can_login: true,
956 ..Default::default()
957 });
958 Self {
959 user_info,
960 id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
961 }
962 }
963
964 fn gen_id(&self) -> u32 {
965 self.id.fetch_add(1, Ordering::SeqCst)
966 }
967}
968
969pub struct MockFrontendMetaClient {}
970
971#[async_trait::async_trait]
972impl FrontendMetaClient for MockFrontendMetaClient {
973 async fn try_unregister(&self) {}
974
975 async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
976 Ok(INVALID_VERSION_ID)
977 }
978
979 async fn wait(&self) -> RpcResult<()> {
980 Ok(())
981 }
982
983 async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
984 Ok(vec![])
985 }
986
987 async fn list_table_fragments(
988 &self,
989 _table_ids: &[u32],
990 ) -> RpcResult<HashMap<u32, TableFragmentInfo>> {
991 Ok(HashMap::default())
992 }
993
994 async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
995 Ok(vec![])
996 }
997
998 async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
999 Ok(vec![])
1000 }
1001
1002 async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1003 Ok(vec![])
1004 }
1005
1006 async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1007 Ok(vec![])
1008 }
1009
1010 async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1011 Ok(vec![])
1012 }
1013
1014 async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1015 Ok(vec![])
1016 }
1017
1018 async fn get_system_params(&self) -> RpcResult<SystemParamsReader> {
1019 Ok(SystemParams::default().into())
1020 }
1021
1022 async fn set_system_param(
1023 &self,
1024 _param: String,
1025 _value: Option<String>,
1026 ) -> RpcResult<Option<SystemParamsReader>> {
1027 Ok(Some(SystemParams::default().into()))
1028 }
1029
1030 async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1031 Ok(Default::default())
1032 }
1033
1034 async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1035 Ok("".to_owned())
1036 }
1037
1038 async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1039 Ok(vec![])
1040 }
1041
1042 async fn get_tables(
1043 &self,
1044 _table_ids: &[u32],
1045 _include_dropped_tables: bool,
1046 ) -> RpcResult<HashMap<u32, Table>> {
1047 Ok(HashMap::new())
1048 }
1049
1050 async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(u32, u64)>> {
1051 unimplemented!()
1052 }
1053
1054 async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1055 Ok(HummockVersion::default())
1056 }
1057
1058 async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1059 unimplemented!()
1060 }
1061
1062 async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1063 unimplemented!()
1064 }
1065
1066 async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1067 unimplemented!()
1068 }
1069
1070 async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1071 unimplemented!()
1072 }
1073
1074 async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1075 unimplemented!()
1076 }
1077
1078 async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1079 unimplemented!()
1080 }
1081
1082 async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1083 unimplemented!()
1084 }
1085
1086 async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1087 unimplemented!()
1088 }
1089
1090 async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1091 Ok(vec![])
1092 }
1093
1094 async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1095 unimplemented!()
1096 }
1097
1098 async fn recover(&self) -> RpcResult<()> {
1099 unimplemented!()
1100 }
1101
1102 async fn apply_throttle(
1103 &self,
1104 _kind: PbThrottleTarget,
1105 _id: u32,
1106 _rate_limit: Option<u32>,
1107 ) -> RpcResult<()> {
1108 unimplemented!()
1109 }
1110
1111 async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1112 Ok(RecoveryStatus::StatusRunning)
1113 }
1114
1115 async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1116 Ok(vec![])
1117 }
1118
1119 async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1120 Ok(vec![])
1121 }
1122
1123 async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1124 unimplemented!()
1125 }
1126
1127 async fn alter_sink_props(
1128 &self,
1129 _sink_id: u32,
1130 _changed_props: BTreeMap<String, String>,
1131 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1132 _connector_conn_ref: Option<u32>,
1133 ) -> RpcResult<()> {
1134 unimplemented!()
1135 }
1136
1137 async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1138 unimplemented!()
1139 }
1140}
1141
1142#[cfg(test)]
1143pub static PROTO_FILE_DATA: &str = r#"
1144 syntax = "proto3";
1145 package test;
1146 message TestRecord {
1147 int32 id = 1;
1148 Country country = 3;
1149 int64 zipcode = 4;
1150 float rate = 5;
1151 }
1152 message TestRecordAlterType {
1153 string id = 1;
1154 Country country = 3;
1155 int32 zipcode = 4;
1156 float rate = 5;
1157 }
1158 message TestRecordExt {
1159 int32 id = 1;
1160 Country country = 3;
1161 int64 zipcode = 4;
1162 float rate = 5;
1163 string name = 6;
1164 }
1165 message Country {
1166 string address = 1;
1167 City city = 2;
1168 string zipcode = 3;
1169 }
1170 message City {
1171 string address = 1;
1172 string zipcode = 2;
1173 }"#;
1174
1175pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1178 let in_file = Builder::new()
1179 .prefix("temp")
1180 .suffix(".proto")
1181 .rand_bytes(8)
1182 .tempfile()
1183 .unwrap();
1184
1185 let out_file = Builder::new()
1186 .prefix("temp")
1187 .suffix(".pb")
1188 .rand_bytes(8)
1189 .tempfile()
1190 .unwrap();
1191
1192 let mut file = in_file.as_file();
1193 file.write_all(proto_data.as_ref())
1194 .expect("writing binary to test file");
1195 file.flush().expect("flush temp file failed");
1196 let include_path = in_file
1197 .path()
1198 .parent()
1199 .unwrap()
1200 .to_string_lossy()
1201 .into_owned();
1202 let out_path = out_file.path().to_string_lossy().into_owned();
1203 let in_path = in_file.path().to_string_lossy().into_owned();
1204 let mut compile = std::process::Command::new("protoc");
1205
1206 let out = compile
1207 .arg("--include_imports")
1208 .arg("-I")
1209 .arg(include_path)
1210 .arg(format!("--descriptor_set_out={}", out_path))
1211 .arg(in_path)
1212 .output()
1213 .expect("failed to compile proto");
1214 if !out.status.success() {
1215 panic!("compile proto failed \n output: {:?}", out);
1216 }
1217 out_file
1218}