1use std::collections::{HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
29 FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
30 RW_CATALOG_SCHEMA_NAME, TableId,
31};
32use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
33use risingwave_common::session_config::SessionConfig;
34use risingwave_common::system_param::reader::SystemParamsReader;
35use risingwave_common::util::cluster_limit::ClusterLimit;
36use risingwave_common::util::column_index_mapping::ColIndexMapping;
37use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
38use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
39use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
40use risingwave_pb::backup_service::MetaSnapshotMetadata;
41use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
42use risingwave_pb::catalog::{
43 PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44 PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::{
49 DdlProgress, PbTableJobType, ReplaceJobPlan, TableJobType, alter_name_request,
50 alter_set_schema_request, alter_swap_rename_request, create_connection_request,
51};
52use risingwave_pb::hummock::write_limits::WriteLimit;
53use risingwave_pb::hummock::{
54 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
55};
56use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
57use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
58use risingwave_pb::meta::list_actor_states_response::ActorState;
59use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
60use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
61use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
62use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
63use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
64use risingwave_pb::meta::{
65 EventLog, PbTableParallelism, PbThrottleTarget, RecoveryStatus, SystemParams,
66};
67use risingwave_pb::stream_plan::StreamFragmentGraph;
68use risingwave_pb::user::update_user_request::UpdateField;
69use risingwave_pb::user::{GrantPrivilege, UserInfo};
70use risingwave_rpc_client::error::Result as RpcResult;
71use tempfile::{Builder, NamedTempFile};
72
73use crate::FrontendOpts;
74use crate::catalog::catalog_service::CatalogWriter;
75use crate::catalog::root_catalog::Catalog;
76use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId};
77use crate::error::{ErrorCode, Result};
78use crate::handler::RwPgResponse;
79use crate::meta_client::FrontendMetaClient;
80use crate::scheduler::HummockSnapshotManagerRef;
81use crate::session::{AuthContext, FrontendEnv, SessionImpl};
82use crate::user::UserId;
83use crate::user::user_manager::UserInfoManager;
84use crate::user::user_service::UserInfoWriter;
85
86pub struct LocalFrontend {
88 pub opts: FrontendOpts,
89 env: FrontendEnv,
90}
91
92impl SessionManager for LocalFrontend {
93 type Session = SessionImpl;
94
95 fn create_dummy_session(
96 &self,
97 _database_id: u32,
98 _user_name: u32,
99 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
100 unreachable!()
101 }
102
103 fn connect(
104 &self,
105 _database: &str,
106 _user_name: &str,
107 _peer_addr: AddressRef,
108 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
109 Ok(self.session_ref())
110 }
111
112 fn cancel_queries_in_session(&self, _session_id: SessionId) {
113 unreachable!()
114 }
115
116 fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
117 unreachable!()
118 }
119
120 fn end_session(&self, _session: &Self::Session) {
121 unreachable!()
122 }
123}
124
125impl LocalFrontend {
126 #[expect(clippy::unused_async)]
127 pub async fn new(opts: FrontendOpts) -> Self {
128 let env = FrontendEnv::mock();
129 Self { opts, env }
130 }
131
132 pub async fn run_sql(
133 &self,
134 sql: impl Into<String>,
135 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
136 let sql: Arc<str> = Arc::from(sql.into());
137 self.session_ref().run_statement(sql, vec![]).await
138 }
139
140 pub async fn run_sql_with_session(
141 &self,
142 session_ref: Arc<SessionImpl>,
143 sql: impl Into<String>,
144 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
145 let sql: Arc<str> = Arc::from(sql.into());
146 session_ref.run_statement(sql, vec![]).await
147 }
148
149 pub async fn run_user_sql(
150 &self,
151 sql: impl Into<String>,
152 database: String,
153 user_name: String,
154 user_id: UserId,
155 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
156 let sql: Arc<str> = Arc::from(sql.into());
157 self.session_user_ref(database, user_name, user_id)
158 .run_statement(sql, vec![])
159 .await
160 }
161
162 pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
163 let mut rsp = self.run_sql(sql).await.unwrap();
164 let mut res = vec![];
165 #[for_await]
166 for row_set in rsp.values_stream() {
167 for row in row_set.unwrap() {
168 res.push(format!("{:?}", row));
169 }
170 }
171 res
172 }
173
174 pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
175 let mut rsp = self.run_sql(sql).await.unwrap();
176 assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
177 let mut res = String::new();
178 #[for_await]
179 for row_set in rsp.values_stream() {
180 for row in row_set.unwrap() {
181 let row: Row = row;
182 let row = row.values()[0].as_ref().unwrap();
183 res += std::str::from_utf8(row).unwrap();
184 res += "\n";
185 }
186 }
187 res
188 }
189
190 pub fn session_ref(&self) -> Arc<SessionImpl> {
192 self.session_user_ref(
193 DEFAULT_DATABASE_NAME.to_owned(),
194 DEFAULT_SUPER_USER.to_owned(),
195 DEFAULT_SUPER_USER_ID,
196 )
197 }
198
199 pub fn session_user_ref(
200 &self,
201 database: String,
202 user_name: String,
203 user_id: UserId,
204 ) -> Arc<SessionImpl> {
205 Arc::new(SessionImpl::new(
206 self.env.clone(),
207 AuthContext::new(database, user_name, user_id),
208 UserAuthenticator::None,
209 (0, 0),
211 Address::Tcp(SocketAddr::new(
212 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
213 6666,
214 ))
215 .into(),
216 Default::default(),
217 ))
218 }
219}
220
221pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
222 if rsp.stmt_type() != StatementType::EXPLAIN {
223 panic!("RESPONSE INVALID: {rsp:?}");
224 }
225 let mut res = String::new();
226 #[for_await]
227 for row_set in rsp.values_stream() {
228 for row in row_set.unwrap() {
229 let row: Row = row;
230 let row = row.values()[0].as_ref().unwrap();
231 res += std::str::from_utf8(row).unwrap();
232 res += "\n";
233 }
234 }
235 res
236}
237
238pub struct MockCatalogWriter {
239 catalog: Arc<RwLock<Catalog>>,
240 id: AtomicU32,
241 table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
242 schema_id_to_database_id: RwLock<HashMap<u32, DatabaseId>>,
243 hummock_snapshot_manager: HummockSnapshotManagerRef,
244}
245
246#[async_trait::async_trait]
247impl CatalogWriter for MockCatalogWriter {
248 async fn create_database(
249 &self,
250 db_name: &str,
251 owner: UserId,
252 resource_group: &str,
253 ) -> Result<()> {
254 let database_id = self.gen_id();
255 self.catalog.write().create_database(&PbDatabase {
256 name: db_name.to_owned(),
257 id: database_id,
258 owner,
259 resource_group: resource_group.to_owned(),
260 });
261 self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
262 .await?;
263 self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
264 .await?;
265 self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
266 .await?;
267 Ok(())
268 }
269
270 async fn create_schema(
271 &self,
272 db_id: DatabaseId,
273 schema_name: &str,
274 owner: UserId,
275 ) -> Result<()> {
276 let id = self.gen_id();
277 self.catalog.write().create_schema(&PbSchema {
278 id,
279 name: schema_name.to_owned(),
280 database_id: db_id,
281 owner,
282 });
283 self.add_schema_id(id, db_id);
284 Ok(())
285 }
286
287 async fn create_materialized_view(
288 &self,
289 mut table: PbTable,
290 _graph: StreamFragmentGraph,
291 _dependencies: HashSet<ObjectId>,
292 _specific_resource_group: Option<String>,
293 ) -> Result<()> {
294 table.id = self.gen_id();
295 table.stream_job_status = PbStreamJobStatus::Created as _;
296 table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
297 self.catalog.write().create_table(&table);
298 self.add_table_or_source_id(table.id, table.schema_id, table.database_id);
299 self.hummock_snapshot_manager
300 .add_table_for_test(TableId::new(table.id));
301 Ok(())
302 }
303
304 async fn create_view(&self, mut view: PbView) -> Result<()> {
305 view.id = self.gen_id();
306 self.catalog.write().create_view(&view);
307 self.add_table_or_source_id(view.id, view.schema_id, view.database_id);
308 Ok(())
309 }
310
311 async fn create_table(
312 &self,
313 source: Option<PbSource>,
314 mut table: PbTable,
315 graph: StreamFragmentGraph,
316 _job_type: PbTableJobType,
317 ) -> Result<()> {
318 if let Some(source) = source {
319 let source_id = self.create_source_inner(source)?;
320 table.optional_associated_source_id =
321 Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
322 }
323 self.create_materialized_view(table, graph, HashSet::new(), None)
324 .await?;
325 Ok(())
326 }
327
328 async fn replace_table(
329 &self,
330 _source: Option<PbSource>,
331 mut table: PbTable,
332 _graph: StreamFragmentGraph,
333 _mapping: ColIndexMapping,
334 _job_type: TableJobType,
335 ) -> Result<()> {
336 table.stream_job_status = PbStreamJobStatus::Created as _;
337 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
338 self.catalog.write().update_table(&table);
339 Ok(())
340 }
341
342 async fn replace_source(
343 &self,
344 source: PbSource,
345 _graph: StreamFragmentGraph,
346 _mapping: ColIndexMapping,
347 ) -> Result<()> {
348 self.catalog.write().update_source(&source);
349 Ok(())
350 }
351
352 async fn create_source(
353 &self,
354 source: PbSource,
355 _graph: Option<StreamFragmentGraph>,
356 ) -> Result<()> {
357 self.create_source_inner(source).map(|_| ())
358 }
359
360 async fn create_sink(
361 &self,
362 sink: PbSink,
363 graph: StreamFragmentGraph,
364 _affected_table_change: Option<ReplaceJobPlan>,
365 _dependencies: HashSet<ObjectId>,
366 ) -> Result<()> {
367 self.create_sink_inner(sink, graph)
368 }
369
370 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
371 self.create_subscription_inner(subscription)
372 }
373
374 async fn create_index(
375 &self,
376 mut index: PbIndex,
377 mut index_table: PbTable,
378 _graph: StreamFragmentGraph,
379 ) -> Result<()> {
380 index_table.id = self.gen_id();
381 index_table.stream_job_status = PbStreamJobStatus::Created as _;
382 index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
383 self.catalog.write().create_table(&index_table);
384 self.add_table_or_index_id(
385 index_table.id,
386 index_table.schema_id,
387 index_table.database_id,
388 );
389
390 index.id = index_table.id;
391 index.index_table_id = index_table.id;
392 self.catalog.write().create_index(&index);
393 Ok(())
394 }
395
396 async fn create_function(&self, _function: PbFunction) -> Result<()> {
397 unreachable!()
398 }
399
400 async fn create_connection(
401 &self,
402 _connection_name: String,
403 _database_id: u32,
404 _schema_id: u32,
405 _owner_id: u32,
406 _connection: create_connection_request::Payload,
407 ) -> Result<()> {
408 unreachable!()
409 }
410
411 async fn create_secret(
412 &self,
413 _secret_name: String,
414 _database_id: u32,
415 _schema_id: u32,
416 _owner_id: u32,
417 _payload: Vec<u8>,
418 ) -> Result<()> {
419 unreachable!()
420 }
421
422 async fn comment_on(&self, _comment: PbComment) -> Result<()> {
423 unreachable!()
424 }
425
426 async fn drop_table(
427 &self,
428 source_id: Option<u32>,
429 table_id: TableId,
430 cascade: bool,
431 ) -> Result<()> {
432 if cascade {
433 return Err(ErrorCode::NotSupported(
434 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
435 "use drop instead".to_owned(),
436 )
437 .into());
438 }
439 if let Some(source_id) = source_id {
440 self.drop_table_or_source_id(source_id);
441 }
442 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
443 let indexes =
444 self.catalog
445 .read()
446 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
447 for index in indexes {
448 self.drop_index(index.id, cascade).await?;
449 }
450 self.catalog
451 .write()
452 .drop_table(database_id, schema_id, table_id);
453 if let Some(source_id) = source_id {
454 self.catalog
455 .write()
456 .drop_source(database_id, schema_id, source_id);
457 }
458 Ok(())
459 }
460
461 async fn drop_view(&self, _view_id: u32, _cascade: bool) -> Result<()> {
462 unreachable!()
463 }
464
465 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
466 if cascade {
467 return Err(ErrorCode::NotSupported(
468 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
469 "use drop instead".to_owned(),
470 )
471 .into());
472 }
473 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
474 let indexes =
475 self.catalog
476 .read()
477 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
478 for index in indexes {
479 self.drop_index(index.id, cascade).await?;
480 }
481 self.catalog
482 .write()
483 .drop_table(database_id, schema_id, table_id);
484 Ok(())
485 }
486
487 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
488 if cascade {
489 return Err(ErrorCode::NotSupported(
490 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
491 "use drop instead".to_owned(),
492 )
493 .into());
494 }
495 let (database_id, schema_id) = self.drop_table_or_source_id(source_id);
496 self.catalog
497 .write()
498 .drop_source(database_id, schema_id, source_id);
499 Ok(())
500 }
501
502 async fn drop_sink(
503 &self,
504 sink_id: u32,
505 cascade: bool,
506 _target_table_change: Option<ReplaceJobPlan>,
507 ) -> Result<()> {
508 if cascade {
509 return Err(ErrorCode::NotSupported(
510 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
511 "use drop instead".to_owned(),
512 )
513 .into());
514 }
515 let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id);
516 self.catalog
517 .write()
518 .drop_sink(database_id, schema_id, sink_id);
519 Ok(())
520 }
521
522 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
523 if cascade {
524 return Err(ErrorCode::NotSupported(
525 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
526 "use drop instead".to_owned(),
527 )
528 .into());
529 }
530 let (database_id, schema_id) = self.drop_table_or_subscription_id(subscription_id);
531 self.catalog
532 .write()
533 .drop_subscription(database_id, schema_id, subscription_id);
534 Ok(())
535 }
536
537 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
538 if cascade {
539 return Err(ErrorCode::NotSupported(
540 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
541 "use drop instead".to_owned(),
542 )
543 .into());
544 }
545 let &schema_id = self
546 .table_id_to_schema_id
547 .read()
548 .get(&index_id.index_id)
549 .unwrap();
550 let database_id = self.get_database_id_by_schema(schema_id);
551
552 let index = {
553 let catalog_reader = self.catalog.read();
554 let schema_catalog = catalog_reader
555 .get_schema_by_id(&database_id, &schema_id)
556 .unwrap();
557 schema_catalog.get_index_by_id(&index_id).unwrap().clone()
558 };
559
560 let index_table_id = index.index_table.id;
561 let (database_id, schema_id) = self.drop_table_or_index_id(index_id.index_id);
562 self.catalog
563 .write()
564 .drop_index(database_id, schema_id, index_id);
565 self.catalog
566 .write()
567 .drop_table(database_id, schema_id, index_table_id);
568 Ok(())
569 }
570
571 async fn drop_function(&self, _function_id: FunctionId) -> Result<()> {
572 unreachable!()
573 }
574
575 async fn drop_connection(&self, _connection_id: ConnectionId) -> Result<()> {
576 unreachable!()
577 }
578
579 async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
580 unreachable!()
581 }
582
583 async fn drop_database(&self, database_id: u32) -> Result<()> {
584 self.catalog.write().drop_database(database_id);
585 Ok(())
586 }
587
588 async fn drop_schema(&self, schema_id: u32, _cascade: bool) -> Result<()> {
589 let database_id = self.drop_schema_id(schema_id);
590 self.catalog.write().drop_schema(database_id, schema_id);
591 Ok(())
592 }
593
594 async fn alter_name(
595 &self,
596 object_id: alter_name_request::Object,
597 object_name: &str,
598 ) -> Result<()> {
599 match object_id {
600 alter_name_request::Object::TableId(table_id) => {
601 self.catalog
602 .write()
603 .alter_table_name_by_id(&table_id.into(), object_name);
604 Ok(())
605 }
606 _ => {
607 unimplemented!()
608 }
609 }
610 }
611
612 async fn alter_source(&self, source: PbSource) -> Result<()> {
613 self.catalog.write().update_source(&source);
614 Ok(())
615 }
616
617 async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
618 for database in self.catalog.read().iter_databases() {
619 for schema in database.iter_schemas() {
620 match object {
621 Object::TableId(table_id) => {
622 if let Some(table) =
623 schema.get_created_table_by_id(&TableId::from(table_id))
624 {
625 let mut pb_table = table.to_prost();
626 pb_table.owner = owner_id;
627 self.catalog.write().update_table(&pb_table);
628 return Ok(());
629 }
630 }
631 _ => unreachable!(),
632 }
633 }
634 }
635
636 Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
637 }
638
639 async fn alter_set_schema(
640 &self,
641 object: alter_set_schema_request::Object,
642 new_schema_id: u32,
643 ) -> Result<()> {
644 match object {
645 alter_set_schema_request::Object::TableId(table_id) => {
646 let mut pb_table = {
647 let reader = self.catalog.read();
648 let table = reader.get_any_table_by_id(&table_id.into())?.to_owned();
649 table.to_prost()
650 };
651 pb_table.schema_id = new_schema_id;
652 self.catalog.write().update_table(&pb_table);
653 self.table_id_to_schema_id
654 .write()
655 .insert(table_id, new_schema_id);
656 Ok(())
657 }
658 _ => unreachable!(),
659 }
660 }
661
662 async fn alter_parallelism(
663 &self,
664 _table_id: u32,
665 _parallelism: PbTableParallelism,
666 _deferred: bool,
667 ) -> Result<()> {
668 todo!()
669 }
670
671 async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
672 todo!()
673 }
674
675 async fn alter_secret(
676 &self,
677 _secret_id: u32,
678 _secret_name: String,
679 _database_id: u32,
680 _schema_id: u32,
681 _owner_id: u32,
682 _payload: Vec<u8>,
683 ) -> Result<()> {
684 unreachable!()
685 }
686
687 async fn alter_resource_group(
688 &self,
689 _table_id: u32,
690 _resource_group: Option<String>,
691 _deferred: bool,
692 ) -> Result<()> {
693 todo!()
694 }
695}
696
697impl MockCatalogWriter {
698 pub fn new(
699 catalog: Arc<RwLock<Catalog>>,
700 hummock_snapshot_manager: HummockSnapshotManagerRef,
701 ) -> Self {
702 catalog.write().create_database(&PbDatabase {
703 id: 0,
704 name: DEFAULT_DATABASE_NAME.to_owned(),
705 owner: DEFAULT_SUPER_USER_ID,
706 resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
707 });
708 catalog.write().create_schema(&PbSchema {
709 id: 1,
710 name: DEFAULT_SCHEMA_NAME.to_owned(),
711 database_id: 0,
712 owner: DEFAULT_SUPER_USER_ID,
713 });
714 catalog.write().create_schema(&PbSchema {
715 id: 2,
716 name: PG_CATALOG_SCHEMA_NAME.to_owned(),
717 database_id: 0,
718 owner: DEFAULT_SUPER_USER_ID,
719 });
720 catalog.write().create_schema(&PbSchema {
721 id: 3,
722 name: RW_CATALOG_SCHEMA_NAME.to_owned(),
723 database_id: 0,
724 owner: DEFAULT_SUPER_USER_ID,
725 });
726 let mut map: HashMap<u32, DatabaseId> = HashMap::new();
727 map.insert(1_u32, 0_u32);
728 map.insert(2_u32, 0_u32);
729 map.insert(3_u32, 0_u32);
730 Self {
731 catalog,
732 id: AtomicU32::new(3),
733 table_id_to_schema_id: Default::default(),
734 schema_id_to_database_id: RwLock::new(map),
735 hummock_snapshot_manager,
736 }
737 }
738
739 fn gen_id(&self) -> u32 {
740 self.id.fetch_add(1, Ordering::SeqCst) + 1
742 }
743
744 fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
745 self.table_id_to_schema_id
746 .write()
747 .insert(table_id, schema_id);
748 }
749
750 fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
751 let schema_id = self
752 .table_id_to_schema_id
753 .write()
754 .remove(&table_id)
755 .unwrap();
756 (self.get_database_id_by_schema(schema_id), schema_id)
757 }
758
759 fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
760 self.table_id_to_schema_id
761 .write()
762 .insert(table_id, schema_id);
763 }
764
765 fn add_table_or_subscription_id(
766 &self,
767 table_id: u32,
768 schema_id: SchemaId,
769 _database_id: DatabaseId,
770 ) {
771 self.table_id_to_schema_id
772 .write()
773 .insert(table_id, schema_id);
774 }
775
776 fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
777 self.table_id_to_schema_id
778 .write()
779 .insert(table_id, schema_id);
780 }
781
782 fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
783 let schema_id = self
784 .table_id_to_schema_id
785 .write()
786 .remove(&table_id)
787 .unwrap();
788 (self.get_database_id_by_schema(schema_id), schema_id)
789 }
790
791 fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
792 let schema_id = self
793 .table_id_to_schema_id
794 .write()
795 .remove(&table_id)
796 .unwrap();
797 (self.get_database_id_by_schema(schema_id), schema_id)
798 }
799
800 fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
801 let schema_id = self
802 .table_id_to_schema_id
803 .write()
804 .remove(&table_id)
805 .unwrap();
806 (self.get_database_id_by_schema(schema_id), schema_id)
807 }
808
809 fn add_schema_id(&self, schema_id: u32, database_id: DatabaseId) {
810 self.schema_id_to_database_id
811 .write()
812 .insert(schema_id, database_id);
813 }
814
815 fn drop_schema_id(&self, schema_id: u32) -> DatabaseId {
816 self.schema_id_to_database_id
817 .write()
818 .remove(&schema_id)
819 .unwrap()
820 }
821
822 fn create_source_inner(&self, mut source: PbSource) -> Result<u32> {
823 source.id = self.gen_id();
824 self.catalog.write().create_source(&source);
825 self.add_table_or_source_id(source.id, source.schema_id, source.database_id);
826 Ok(source.id)
827 }
828
829 fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
830 sink.id = self.gen_id();
831 self.catalog.write().create_sink(&sink);
832 self.add_table_or_sink_id(sink.id, sink.schema_id, sink.database_id);
833 Ok(())
834 }
835
836 fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
837 subscription.id = self.gen_id();
838 self.catalog.write().create_subscription(&subscription);
839 self.add_table_or_subscription_id(
840 subscription.id,
841 subscription.schema_id,
842 subscription.database_id,
843 );
844 Ok(())
845 }
846
847 fn get_database_id_by_schema(&self, schema_id: u32) -> DatabaseId {
848 *self
849 .schema_id_to_database_id
850 .read()
851 .get(&schema_id)
852 .unwrap()
853 }
854}
855
856pub struct MockUserInfoWriter {
857 id: AtomicU32,
858 user_info: Arc<RwLock<UserInfoManager>>,
859}
860
861#[async_trait::async_trait]
862impl UserInfoWriter for MockUserInfoWriter {
863 async fn create_user(&self, user: UserInfo) -> Result<()> {
864 let mut user = user;
865 user.id = self.gen_id();
866 self.user_info.write().create_user(user);
867 Ok(())
868 }
869
870 async fn drop_user(&self, id: UserId) -> Result<()> {
871 self.user_info.write().drop_user(id);
872 Ok(())
873 }
874
875 async fn update_user(
876 &self,
877 update_user: UserInfo,
878 update_fields: Vec<UpdateField>,
879 ) -> Result<()> {
880 let mut lock = self.user_info.write();
881 let id = update_user.get_id();
882 let Some(old_name) = lock.get_user_name_by_id(id) else {
883 return Ok(());
884 };
885 let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
886 update_fields.into_iter().for_each(|field| match field {
887 UpdateField::Super => user_info.is_super = update_user.is_super,
888 UpdateField::Login => user_info.can_login = update_user.can_login,
889 UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
890 UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
891 UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
892 UpdateField::Rename => user_info.name.clone_from(&update_user.name),
893 UpdateField::Unspecified => unreachable!(),
894 });
895 lock.update_user(update_user);
896 Ok(())
897 }
898
899 async fn grant_privilege(
902 &self,
903 users: Vec<UserId>,
904 privileges: Vec<GrantPrivilege>,
905 with_grant_option: bool,
906 _grantor: UserId,
907 ) -> Result<()> {
908 let privileges = privileges
909 .into_iter()
910 .map(|mut p| {
911 p.action_with_opts
912 .iter_mut()
913 .for_each(|ao| ao.with_grant_option = with_grant_option);
914 p
915 })
916 .collect::<Vec<_>>();
917 for user_id in users {
918 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
919 u.extend_privileges(privileges.clone());
920 }
921 }
922 Ok(())
923 }
924
925 async fn revoke_privilege(
928 &self,
929 users: Vec<UserId>,
930 privileges: Vec<GrantPrivilege>,
931 _granted_by: UserId,
932 _revoke_by: UserId,
933 revoke_grant_option: bool,
934 _cascade: bool,
935 ) -> Result<()> {
936 for user_id in users {
937 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
938 u.revoke_privileges(privileges.clone(), revoke_grant_option);
939 }
940 }
941 Ok(())
942 }
943}
944
945impl MockUserInfoWriter {
946 pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
947 user_info.write().create_user(UserInfo {
948 id: DEFAULT_SUPER_USER_ID,
949 name: DEFAULT_SUPER_USER.to_owned(),
950 is_super: true,
951 can_create_db: true,
952 can_create_user: true,
953 can_login: true,
954 ..Default::default()
955 });
956 Self {
957 user_info,
958 id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
959 }
960 }
961
962 fn gen_id(&self) -> u32 {
963 self.id.fetch_add(1, Ordering::SeqCst)
964 }
965}
966
967pub struct MockFrontendMetaClient {}
968
969#[async_trait::async_trait]
970impl FrontendMetaClient for MockFrontendMetaClient {
971 async fn try_unregister(&self) {}
972
973 async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
974 Ok(INVALID_VERSION_ID)
975 }
976
977 async fn wait(&self) -> RpcResult<()> {
978 Ok(())
979 }
980
981 async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
982 Ok(vec![])
983 }
984
985 async fn list_table_fragments(
986 &self,
987 _table_ids: &[u32],
988 ) -> RpcResult<HashMap<u32, TableFragmentInfo>> {
989 Ok(HashMap::default())
990 }
991
992 async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
993 Ok(vec![])
994 }
995
996 async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
997 Ok(vec![])
998 }
999
1000 async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1001 Ok(vec![])
1002 }
1003
1004 async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1005 Ok(vec![])
1006 }
1007
1008 async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1009 Ok(vec![])
1010 }
1011
1012 async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1013 Ok(vec![])
1014 }
1015
1016 async fn get_system_params(&self) -> RpcResult<SystemParamsReader> {
1017 Ok(SystemParams::default().into())
1018 }
1019
1020 async fn set_system_param(
1021 &self,
1022 _param: String,
1023 _value: Option<String>,
1024 ) -> RpcResult<Option<SystemParamsReader>> {
1025 Ok(Some(SystemParams::default().into()))
1026 }
1027
1028 async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1029 Ok(Default::default())
1030 }
1031
1032 async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1033 Ok("".to_owned())
1034 }
1035
1036 async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1037 Ok(vec![])
1038 }
1039
1040 async fn get_tables(
1041 &self,
1042 _table_ids: &[u32],
1043 _include_dropped_tables: bool,
1044 ) -> RpcResult<HashMap<u32, Table>> {
1045 Ok(HashMap::new())
1046 }
1047
1048 async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(u32, u64)>> {
1049 unimplemented!()
1050 }
1051
1052 async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1053 Ok(HummockVersion::default())
1054 }
1055
1056 async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1057 unimplemented!()
1058 }
1059
1060 async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1061 unimplemented!()
1062 }
1063
1064 async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1065 unimplemented!()
1066 }
1067
1068 async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1069 unimplemented!()
1070 }
1071
1072 async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1073 unimplemented!()
1074 }
1075
1076 async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1077 unimplemented!()
1078 }
1079
1080 async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1081 unimplemented!()
1082 }
1083
1084 async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1085 unimplemented!()
1086 }
1087
1088 async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1089 Ok(vec![])
1090 }
1091
1092 async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1093 unimplemented!()
1094 }
1095
1096 async fn recover(&self) -> RpcResult<()> {
1097 unimplemented!()
1098 }
1099
1100 async fn apply_throttle(
1101 &self,
1102 _kind: PbThrottleTarget,
1103 _id: u32,
1104 _rate_limit: Option<u32>,
1105 ) -> RpcResult<()> {
1106 unimplemented!()
1107 }
1108
1109 async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1110 Ok(RecoveryStatus::StatusRunning)
1111 }
1112
1113 async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1114 Ok(vec![])
1115 }
1116
1117 async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1118 Ok(vec![])
1119 }
1120
1121 async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1122 unimplemented!()
1123 }
1124}
1125
1126#[cfg(test)]
1127pub static PROTO_FILE_DATA: &str = r#"
1128 syntax = "proto3";
1129 package test;
1130 message TestRecord {
1131 int32 id = 1;
1132 Country country = 3;
1133 int64 zipcode = 4;
1134 float rate = 5;
1135 }
1136 message TestRecordAlterType {
1137 string id = 1;
1138 Country country = 3;
1139 int32 zipcode = 4;
1140 float rate = 5;
1141 }
1142 message TestRecordExt {
1143 int32 id = 1;
1144 Country country = 3;
1145 int64 zipcode = 4;
1146 float rate = 5;
1147 string name = 6;
1148 }
1149 message Country {
1150 string address = 1;
1151 City city = 2;
1152 string zipcode = 3;
1153 }
1154 message City {
1155 string address = 1;
1156 string zipcode = 2;
1157 }"#;
1158
1159pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1162 let in_file = Builder::new()
1163 .prefix("temp")
1164 .suffix(".proto")
1165 .rand_bytes(8)
1166 .tempfile()
1167 .unwrap();
1168
1169 let out_file = Builder::new()
1170 .prefix("temp")
1171 .suffix(".pb")
1172 .rand_bytes(8)
1173 .tempfile()
1174 .unwrap();
1175
1176 let mut file = in_file.as_file();
1177 file.write_all(proto_data.as_ref())
1178 .expect("writing binary to test file");
1179 file.flush().expect("flush temp file failed");
1180 let include_path = in_file
1181 .path()
1182 .parent()
1183 .unwrap()
1184 .to_string_lossy()
1185 .into_owned();
1186 let out_path = out_file.path().to_string_lossy().into_owned();
1187 let in_path = in_file.path().to_string_lossy().into_owned();
1188 let mut compile = std::process::Command::new("protoc");
1189
1190 let out = compile
1191 .arg("--include_imports")
1192 .arg("-I")
1193 .arg(include_path)
1194 .arg(format!("--descriptor_set_out={}", out_path))
1195 .arg(in_path)
1196 .output()
1197 .expect("failed to compile proto");
1198 if !out.status.success() {
1199 panic!("compile proto failed \n output: {:?}", out);
1200 }
1201 out_file
1202}