1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28 AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29 DEFAULT_SUPER_USER_ID, FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId,
30 PG_CATALOG_SCHEMA_NAME, RW_CATALOG_SCHEMA_NAME, TableId,
31};
32use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
33use risingwave_common::session_config::SessionConfig;
34use risingwave_common::system_param::reader::SystemParamsReader;
35use risingwave_common::util::cluster_limit::ClusterLimit;
36use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
37use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
38use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
39use risingwave_pb::backup_service::MetaSnapshotMetadata;
40use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
41use risingwave_pb::catalog::{
42 PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
43 PbSubscription, PbTable, PbView, Table,
44};
45use risingwave_pb::common::WorkerNode;
46use risingwave_pb::ddl_service::alter_owner_request::Object;
47use risingwave_pb::ddl_service::{
48 DdlProgress, PbTableJobType, ReplaceJobPlan, TableJobType, alter_name_request,
49 alter_set_schema_request, alter_swap_rename_request, create_connection_request,
50};
51use risingwave_pb::hummock::write_limits::WriteLimit;
52use risingwave_pb::hummock::{
53 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
54};
55use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
56use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
57use risingwave_pb::meta::list_actor_states_response::ActorState;
58use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
59use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
60use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
61use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
62use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
63use risingwave_pb::meta::{
64 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
65 SystemParams,
66};
67use risingwave_pb::secret::PbSecretRef;
68use risingwave_pb::stream_plan::StreamFragmentGraph;
69use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
70use risingwave_pb::user::update_user_request::UpdateField;
71use risingwave_pb::user::{GrantPrivilege, UserInfo};
72use risingwave_rpc_client::error::Result as RpcResult;
73use tempfile::{Builder, NamedTempFile};
74
75use crate::FrontendOpts;
76use crate::catalog::catalog_service::CatalogWriter;
77use crate::catalog::root_catalog::Catalog;
78use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId};
79use crate::error::{ErrorCode, Result};
80use crate::handler::RwPgResponse;
81use crate::meta_client::FrontendMetaClient;
82use crate::scheduler::HummockSnapshotManagerRef;
83use crate::session::{AuthContext, FrontendEnv, SessionImpl};
84use crate::user::UserId;
85use crate::user::user_manager::UserInfoManager;
86use crate::user::user_service::UserInfoWriter;
87
88pub struct LocalFrontend {
90 pub opts: FrontendOpts,
91 env: FrontendEnv,
92}
93
94impl SessionManager for LocalFrontend {
95 type Session = SessionImpl;
96
97 fn create_dummy_session(
98 &self,
99 _database_id: u32,
100 _user_name: u32,
101 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
102 unreachable!()
103 }
104
105 fn connect(
106 &self,
107 _database: &str,
108 _user_name: &str,
109 _peer_addr: AddressRef,
110 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
111 Ok(self.session_ref())
112 }
113
114 fn cancel_queries_in_session(&self, _session_id: SessionId) {
115 unreachable!()
116 }
117
118 fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
119 unreachable!()
120 }
121
122 fn end_session(&self, _session: &Self::Session) {
123 unreachable!()
124 }
125}
126
127impl LocalFrontend {
128 #[expect(clippy::unused_async)]
129 pub async fn new(opts: FrontendOpts) -> Self {
130 let env = FrontendEnv::mock();
131 Self { opts, env }
132 }
133
134 pub async fn run_sql(
135 &self,
136 sql: impl Into<String>,
137 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
138 let sql: Arc<str> = Arc::from(sql.into());
139 self.session_ref().run_statement(sql, vec![]).await
140 }
141
142 pub async fn run_sql_with_session(
143 &self,
144 session_ref: Arc<SessionImpl>,
145 sql: impl Into<String>,
146 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
147 let sql: Arc<str> = Arc::from(sql.into());
148 session_ref.run_statement(sql, vec![]).await
149 }
150
151 pub async fn run_user_sql(
152 &self,
153 sql: impl Into<String>,
154 database: String,
155 user_name: String,
156 user_id: UserId,
157 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
158 let sql: Arc<str> = Arc::from(sql.into());
159 self.session_user_ref(database, user_name, user_id)
160 .run_statement(sql, vec![])
161 .await
162 }
163
164 pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
165 let mut rsp = self.run_sql(sql).await.unwrap();
166 let mut res = vec![];
167 #[for_await]
168 for row_set in rsp.values_stream() {
169 for row in row_set.unwrap() {
170 res.push(format!("{:?}", row));
171 }
172 }
173 res
174 }
175
176 pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
177 let mut rsp = self.run_sql(sql).await.unwrap();
178 assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
179 let mut res = String::new();
180 #[for_await]
181 for row_set in rsp.values_stream() {
182 for row in row_set.unwrap() {
183 let row: Row = row;
184 let row = row.values()[0].as_ref().unwrap();
185 res += std::str::from_utf8(row).unwrap();
186 res += "\n";
187 }
188 }
189 res
190 }
191
192 pub fn session_ref(&self) -> Arc<SessionImpl> {
194 self.session_user_ref(
195 DEFAULT_DATABASE_NAME.to_owned(),
196 DEFAULT_SUPER_USER.to_owned(),
197 DEFAULT_SUPER_USER_ID,
198 )
199 }
200
201 pub fn session_user_ref(
202 &self,
203 database: String,
204 user_name: String,
205 user_id: UserId,
206 ) -> Arc<SessionImpl> {
207 Arc::new(SessionImpl::new(
208 self.env.clone(),
209 AuthContext::new(database, user_name, user_id),
210 UserAuthenticator::None,
211 (0, 0),
213 Address::Tcp(SocketAddr::new(
214 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
215 6666,
216 ))
217 .into(),
218 Default::default(),
219 ))
220 }
221}
222
223pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
224 if rsp.stmt_type() != StatementType::EXPLAIN {
225 panic!("RESPONSE INVALID: {rsp:?}");
226 }
227 let mut res = String::new();
228 #[for_await]
229 for row_set in rsp.values_stream() {
230 for row in row_set.unwrap() {
231 let row: Row = row;
232 let row = row.values()[0].as_ref().unwrap();
233 res += std::str::from_utf8(row).unwrap();
234 res += "\n";
235 }
236 }
237 res
238}
239
240pub struct MockCatalogWriter {
241 catalog: Arc<RwLock<Catalog>>,
242 id: AtomicU32,
243 table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
244 schema_id_to_database_id: RwLock<HashMap<u32, DatabaseId>>,
245 hummock_snapshot_manager: HummockSnapshotManagerRef,
246}
247
248#[async_trait::async_trait]
249impl CatalogWriter for MockCatalogWriter {
250 async fn create_database(
251 &self,
252 db_name: &str,
253 owner: UserId,
254 resource_group: &str,
255 barrier_interval_ms: Option<u32>,
256 checkpoint_frequency: Option<u64>,
257 ) -> Result<()> {
258 let database_id = self.gen_id();
259 self.catalog.write().create_database(&PbDatabase {
260 name: db_name.to_owned(),
261 id: database_id,
262 owner,
263 resource_group: resource_group.to_owned(),
264 barrier_interval_ms,
265 checkpoint_frequency,
266 });
267 self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
268 .await?;
269 self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
270 .await?;
271 self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
272 .await?;
273 Ok(())
274 }
275
276 async fn create_schema(
277 &self,
278 db_id: DatabaseId,
279 schema_name: &str,
280 owner: UserId,
281 ) -> Result<()> {
282 let id = self.gen_id();
283 self.catalog.write().create_schema(&PbSchema {
284 id,
285 name: schema_name.to_owned(),
286 database_id: db_id,
287 owner,
288 });
289 self.add_schema_id(id, db_id);
290 Ok(())
291 }
292
293 async fn create_materialized_view(
294 &self,
295 mut table: PbTable,
296 _graph: StreamFragmentGraph,
297 _dependencies: HashSet<ObjectId>,
298 _specific_resource_group: Option<String>,
299 _if_not_exists: bool,
300 ) -> Result<()> {
301 table.id = self.gen_id();
302 table.stream_job_status = PbStreamJobStatus::Created as _;
303 table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
304 self.catalog.write().create_table(&table);
305 self.add_table_or_source_id(table.id, table.schema_id, table.database_id);
306 self.hummock_snapshot_manager
307 .add_table_for_test(TableId::new(table.id));
308 Ok(())
309 }
310
311 async fn replace_materialized_view(
312 &self,
313 mut table: PbTable,
314 _graph: StreamFragmentGraph,
315 ) -> Result<()> {
316 table.stream_job_status = PbStreamJobStatus::Created as _;
317 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
318 self.catalog.write().update_table(&table);
319 Ok(())
320 }
321
322 async fn create_view(&self, mut view: PbView) -> Result<()> {
323 view.id = self.gen_id();
324 self.catalog.write().create_view(&view);
325 self.add_table_or_source_id(view.id, view.schema_id, view.database_id);
326 Ok(())
327 }
328
329 async fn create_table(
330 &self,
331 source: Option<PbSource>,
332 mut table: PbTable,
333 graph: StreamFragmentGraph,
334 _job_type: PbTableJobType,
335 if_not_exists: bool,
336 ) -> Result<()> {
337 if let Some(source) = source {
338 let source_id = self.create_source_inner(source)?;
339 table.optional_associated_source_id =
340 Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
341 }
342 self.create_materialized_view(table, graph, HashSet::new(), None, if_not_exists)
343 .await?;
344 Ok(())
345 }
346
347 async fn replace_table(
348 &self,
349 _source: Option<PbSource>,
350 mut table: PbTable,
351 _graph: StreamFragmentGraph,
352 _job_type: TableJobType,
353 ) -> Result<()> {
354 table.stream_job_status = PbStreamJobStatus::Created as _;
355 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
356 self.catalog.write().update_table(&table);
357 Ok(())
358 }
359
360 async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
361 self.catalog.write().update_source(&source);
362 Ok(())
363 }
364
365 async fn create_source(
366 &self,
367 source: PbSource,
368 _graph: Option<StreamFragmentGraph>,
369 _if_not_exists: bool,
370 ) -> Result<()> {
371 self.create_source_inner(source).map(|_| ())
372 }
373
374 async fn create_sink(
375 &self,
376 sink: PbSink,
377 graph: StreamFragmentGraph,
378 _affected_table_change: Option<ReplaceJobPlan>,
379 _dependencies: HashSet<ObjectId>,
380 _if_not_exists: bool,
381 ) -> Result<()> {
382 self.create_sink_inner(sink, graph)
383 }
384
385 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
386 self.create_subscription_inner(subscription)
387 }
388
389 async fn create_index(
390 &self,
391 mut index: PbIndex,
392 mut index_table: PbTable,
393 _graph: StreamFragmentGraph,
394 _if_not_exists: bool,
395 ) -> Result<()> {
396 index_table.id = self.gen_id();
397 index_table.stream_job_status = PbStreamJobStatus::Created as _;
398 index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
399 self.catalog.write().create_table(&index_table);
400 self.add_table_or_index_id(
401 index_table.id,
402 index_table.schema_id,
403 index_table.database_id,
404 );
405
406 index.id = index_table.id;
407 index.index_table_id = index_table.id;
408 self.catalog.write().create_index(&index);
409 Ok(())
410 }
411
412 async fn create_function(&self, _function: PbFunction) -> Result<()> {
413 unreachable!()
414 }
415
416 async fn create_connection(
417 &self,
418 _connection_name: String,
419 _database_id: u32,
420 _schema_id: u32,
421 _owner_id: u32,
422 _connection: create_connection_request::Payload,
423 ) -> Result<()> {
424 unreachable!()
425 }
426
427 async fn create_secret(
428 &self,
429 _secret_name: String,
430 _database_id: u32,
431 _schema_id: u32,
432 _owner_id: u32,
433 _payload: Vec<u8>,
434 ) -> Result<()> {
435 unreachable!()
436 }
437
438 async fn comment_on(&self, _comment: PbComment) -> Result<()> {
439 unreachable!()
440 }
441
442 async fn drop_table(
443 &self,
444 source_id: Option<u32>,
445 table_id: TableId,
446 cascade: bool,
447 ) -> Result<()> {
448 if cascade {
449 return Err(ErrorCode::NotSupported(
450 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
451 "use drop instead".to_owned(),
452 )
453 .into());
454 }
455 if let Some(source_id) = source_id {
456 self.drop_table_or_source_id(source_id);
457 }
458 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
459 let indexes =
460 self.catalog
461 .read()
462 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
463 for index in indexes {
464 self.drop_index(index.id, cascade).await?;
465 }
466 self.catalog
467 .write()
468 .drop_table(database_id, schema_id, table_id);
469 if let Some(source_id) = source_id {
470 self.catalog
471 .write()
472 .drop_source(database_id, schema_id, source_id);
473 }
474 Ok(())
475 }
476
477 async fn drop_view(&self, _view_id: u32, _cascade: bool) -> Result<()> {
478 unreachable!()
479 }
480
481 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
482 if cascade {
483 return Err(ErrorCode::NotSupported(
484 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
485 "use drop instead".to_owned(),
486 )
487 .into());
488 }
489 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
490 let indexes =
491 self.catalog
492 .read()
493 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
494 for index in indexes {
495 self.drop_index(index.id, cascade).await?;
496 }
497 self.catalog
498 .write()
499 .drop_table(database_id, schema_id, table_id);
500 Ok(())
501 }
502
503 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
504 if cascade {
505 return Err(ErrorCode::NotSupported(
506 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
507 "use drop instead".to_owned(),
508 )
509 .into());
510 }
511 let (database_id, schema_id) = self.drop_table_or_source_id(source_id);
512 self.catalog
513 .write()
514 .drop_source(database_id, schema_id, source_id);
515 Ok(())
516 }
517
518 async fn drop_sink(
519 &self,
520 sink_id: u32,
521 cascade: bool,
522 _target_table_change: Option<ReplaceJobPlan>,
523 ) -> Result<()> {
524 if cascade {
525 return Err(ErrorCode::NotSupported(
526 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
527 "use drop instead".to_owned(),
528 )
529 .into());
530 }
531 let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id);
532 self.catalog
533 .write()
534 .drop_sink(database_id, schema_id, sink_id);
535 Ok(())
536 }
537
538 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
539 if cascade {
540 return Err(ErrorCode::NotSupported(
541 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
542 "use drop instead".to_owned(),
543 )
544 .into());
545 }
546 let (database_id, schema_id) = self.drop_table_or_subscription_id(subscription_id);
547 self.catalog
548 .write()
549 .drop_subscription(database_id, schema_id, subscription_id);
550 Ok(())
551 }
552
553 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
554 if cascade {
555 return Err(ErrorCode::NotSupported(
556 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
557 "use drop instead".to_owned(),
558 )
559 .into());
560 }
561 let &schema_id = self
562 .table_id_to_schema_id
563 .read()
564 .get(&index_id.index_id)
565 .unwrap();
566 let database_id = self.get_database_id_by_schema(schema_id);
567
568 let index = {
569 let catalog_reader = self.catalog.read();
570 let schema_catalog = catalog_reader
571 .get_schema_by_id(&database_id, &schema_id)
572 .unwrap();
573 schema_catalog.get_index_by_id(&index_id).unwrap().clone()
574 };
575
576 let index_table_id = index.index_table.id;
577 let (database_id, schema_id) = self.drop_table_or_index_id(index_id.index_id);
578 self.catalog
579 .write()
580 .drop_index(database_id, schema_id, index_id);
581 self.catalog
582 .write()
583 .drop_table(database_id, schema_id, index_table_id);
584 Ok(())
585 }
586
587 async fn drop_function(&self, _function_id: FunctionId) -> Result<()> {
588 unreachable!()
589 }
590
591 async fn drop_connection(&self, _connection_id: ConnectionId) -> Result<()> {
592 unreachable!()
593 }
594
595 async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
596 unreachable!()
597 }
598
599 async fn drop_database(&self, database_id: u32) -> Result<()> {
600 self.catalog.write().drop_database(database_id);
601 Ok(())
602 }
603
604 async fn drop_schema(&self, schema_id: u32, _cascade: bool) -> Result<()> {
605 let database_id = self.drop_schema_id(schema_id);
606 self.catalog.write().drop_schema(database_id, schema_id);
607 Ok(())
608 }
609
610 async fn alter_name(
611 &self,
612 object_id: alter_name_request::Object,
613 object_name: &str,
614 ) -> Result<()> {
615 match object_id {
616 alter_name_request::Object::TableId(table_id) => {
617 self.catalog
618 .write()
619 .alter_table_name_by_id(&table_id.into(), object_name);
620 Ok(())
621 }
622 _ => {
623 unimplemented!()
624 }
625 }
626 }
627
628 async fn alter_source(&self, source: PbSource) -> Result<()> {
629 self.catalog.write().update_source(&source);
630 Ok(())
631 }
632
633 async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
634 for database in self.catalog.read().iter_databases() {
635 for schema in database.iter_schemas() {
636 match object {
637 Object::TableId(table_id) => {
638 if let Some(table) =
639 schema.get_created_table_by_id(&TableId::from(table_id))
640 {
641 let mut pb_table = table.to_prost();
642 pb_table.owner = owner_id;
643 self.catalog.write().update_table(&pb_table);
644 return Ok(());
645 }
646 }
647 _ => unreachable!(),
648 }
649 }
650 }
651
652 Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
653 }
654
655 async fn alter_set_schema(
656 &self,
657 object: alter_set_schema_request::Object,
658 new_schema_id: u32,
659 ) -> Result<()> {
660 match object {
661 alter_set_schema_request::Object::TableId(table_id) => {
662 let mut pb_table = {
663 let reader = self.catalog.read();
664 let table = reader.get_any_table_by_id(&table_id.into())?.to_owned();
665 table.to_prost()
666 };
667 pb_table.schema_id = new_schema_id;
668 self.catalog.write().update_table(&pb_table);
669 self.table_id_to_schema_id
670 .write()
671 .insert(table_id, new_schema_id);
672 Ok(())
673 }
674 _ => unreachable!(),
675 }
676 }
677
678 async fn alter_parallelism(
679 &self,
680 _table_id: u32,
681 _parallelism: PbTableParallelism,
682 _deferred: bool,
683 ) -> Result<()> {
684 todo!()
685 }
686
687 async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
688 todo!()
689 }
690
691 async fn alter_secret(
692 &self,
693 _secret_id: u32,
694 _secret_name: String,
695 _database_id: u32,
696 _schema_id: u32,
697 _owner_id: u32,
698 _payload: Vec<u8>,
699 ) -> Result<()> {
700 unreachable!()
701 }
702
703 async fn alter_resource_group(
704 &self,
705 _table_id: u32,
706 _resource_group: Option<String>,
707 _deferred: bool,
708 ) -> Result<()> {
709 todo!()
710 }
711
712 async fn alter_database_param(
713 &self,
714 database_id: u32,
715 param: AlterDatabaseParam,
716 ) -> Result<()> {
717 let mut pb_database = {
718 let reader = self.catalog.read();
719 let database = reader.get_database_by_id(&database_id)?.to_owned();
720 database.to_prost()
721 };
722 match param {
723 AlterDatabaseParam::BarrierIntervalMs(interval) => {
724 pb_database.barrier_interval_ms = interval;
725 }
726 AlterDatabaseParam::CheckpointFrequency(frequency) => {
727 pb_database.checkpoint_frequency = frequency;
728 }
729 }
730 self.catalog.write().update_database(&pb_database);
731 Ok(())
732 }
733}
734
735impl MockCatalogWriter {
736 pub fn new(
737 catalog: Arc<RwLock<Catalog>>,
738 hummock_snapshot_manager: HummockSnapshotManagerRef,
739 ) -> Self {
740 catalog.write().create_database(&PbDatabase {
741 id: 0,
742 name: DEFAULT_DATABASE_NAME.to_owned(),
743 owner: DEFAULT_SUPER_USER_ID,
744 resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
745 barrier_interval_ms: None,
746 checkpoint_frequency: None,
747 });
748 catalog.write().create_schema(&PbSchema {
749 id: 1,
750 name: DEFAULT_SCHEMA_NAME.to_owned(),
751 database_id: 0,
752 owner: DEFAULT_SUPER_USER_ID,
753 });
754 catalog.write().create_schema(&PbSchema {
755 id: 2,
756 name: PG_CATALOG_SCHEMA_NAME.to_owned(),
757 database_id: 0,
758 owner: DEFAULT_SUPER_USER_ID,
759 });
760 catalog.write().create_schema(&PbSchema {
761 id: 3,
762 name: RW_CATALOG_SCHEMA_NAME.to_owned(),
763 database_id: 0,
764 owner: DEFAULT_SUPER_USER_ID,
765 });
766 let mut map: HashMap<u32, DatabaseId> = HashMap::new();
767 map.insert(1_u32, 0_u32);
768 map.insert(2_u32, 0_u32);
769 map.insert(3_u32, 0_u32);
770 Self {
771 catalog,
772 id: AtomicU32::new(3),
773 table_id_to_schema_id: Default::default(),
774 schema_id_to_database_id: RwLock::new(map),
775 hummock_snapshot_manager,
776 }
777 }
778
779 fn gen_id(&self) -> u32 {
780 self.id.fetch_add(1, Ordering::SeqCst) + 1
782 }
783
784 fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
785 self.table_id_to_schema_id
786 .write()
787 .insert(table_id, schema_id);
788 }
789
790 fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
791 let schema_id = self
792 .table_id_to_schema_id
793 .write()
794 .remove(&table_id)
795 .unwrap();
796 (self.get_database_id_by_schema(schema_id), schema_id)
797 }
798
799 fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
800 self.table_id_to_schema_id
801 .write()
802 .insert(table_id, schema_id);
803 }
804
805 fn add_table_or_subscription_id(
806 &self,
807 table_id: u32,
808 schema_id: SchemaId,
809 _database_id: DatabaseId,
810 ) {
811 self.table_id_to_schema_id
812 .write()
813 .insert(table_id, schema_id);
814 }
815
816 fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
817 self.table_id_to_schema_id
818 .write()
819 .insert(table_id, schema_id);
820 }
821
822 fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
823 let schema_id = self
824 .table_id_to_schema_id
825 .write()
826 .remove(&table_id)
827 .unwrap();
828 (self.get_database_id_by_schema(schema_id), schema_id)
829 }
830
831 fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
832 let schema_id = self
833 .table_id_to_schema_id
834 .write()
835 .remove(&table_id)
836 .unwrap();
837 (self.get_database_id_by_schema(schema_id), schema_id)
838 }
839
840 fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
841 let schema_id = self
842 .table_id_to_schema_id
843 .write()
844 .remove(&table_id)
845 .unwrap();
846 (self.get_database_id_by_schema(schema_id), schema_id)
847 }
848
849 fn add_schema_id(&self, schema_id: u32, database_id: DatabaseId) {
850 self.schema_id_to_database_id
851 .write()
852 .insert(schema_id, database_id);
853 }
854
855 fn drop_schema_id(&self, schema_id: u32) -> DatabaseId {
856 self.schema_id_to_database_id
857 .write()
858 .remove(&schema_id)
859 .unwrap()
860 }
861
862 fn create_source_inner(&self, mut source: PbSource) -> Result<u32> {
863 source.id = self.gen_id();
864 self.catalog.write().create_source(&source);
865 self.add_table_or_source_id(source.id, source.schema_id, source.database_id);
866 Ok(source.id)
867 }
868
869 fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
870 sink.id = self.gen_id();
871 self.catalog.write().create_sink(&sink);
872 self.add_table_or_sink_id(sink.id, sink.schema_id, sink.database_id);
873 Ok(())
874 }
875
876 fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
877 subscription.id = self.gen_id();
878 self.catalog.write().create_subscription(&subscription);
879 self.add_table_or_subscription_id(
880 subscription.id,
881 subscription.schema_id,
882 subscription.database_id,
883 );
884 Ok(())
885 }
886
887 fn get_database_id_by_schema(&self, schema_id: u32) -> DatabaseId {
888 *self
889 .schema_id_to_database_id
890 .read()
891 .get(&schema_id)
892 .unwrap()
893 }
894}
895
896pub struct MockUserInfoWriter {
897 id: AtomicU32,
898 user_info: Arc<RwLock<UserInfoManager>>,
899}
900
901#[async_trait::async_trait]
902impl UserInfoWriter for MockUserInfoWriter {
903 async fn create_user(&self, user: UserInfo) -> Result<()> {
904 let mut user = user;
905 user.id = self.gen_id();
906 self.user_info.write().create_user(user);
907 Ok(())
908 }
909
910 async fn drop_user(&self, id: UserId) -> Result<()> {
911 self.user_info.write().drop_user(id);
912 Ok(())
913 }
914
915 async fn update_user(
916 &self,
917 update_user: UserInfo,
918 update_fields: Vec<UpdateField>,
919 ) -> Result<()> {
920 let mut lock = self.user_info.write();
921 let id = update_user.get_id();
922 let Some(old_name) = lock.get_user_name_by_id(id) else {
923 return Ok(());
924 };
925 let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
926 update_fields.into_iter().for_each(|field| match field {
927 UpdateField::Super => user_info.is_super = update_user.is_super,
928 UpdateField::Login => user_info.can_login = update_user.can_login,
929 UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
930 UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
931 UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
932 UpdateField::Rename => user_info.name.clone_from(&update_user.name),
933 UpdateField::Unspecified => unreachable!(),
934 });
935 lock.update_user(update_user);
936 Ok(())
937 }
938
939 async fn grant_privilege(
942 &self,
943 users: Vec<UserId>,
944 privileges: Vec<GrantPrivilege>,
945 with_grant_option: bool,
946 _grantor: UserId,
947 ) -> Result<()> {
948 let privileges = privileges
949 .into_iter()
950 .map(|mut p| {
951 p.action_with_opts
952 .iter_mut()
953 .for_each(|ao| ao.with_grant_option = with_grant_option);
954 p
955 })
956 .collect::<Vec<_>>();
957 for user_id in users {
958 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
959 u.extend_privileges(privileges.clone());
960 }
961 }
962 Ok(())
963 }
964
965 async fn revoke_privilege(
968 &self,
969 users: Vec<UserId>,
970 privileges: Vec<GrantPrivilege>,
971 _granted_by: UserId,
972 _revoke_by: UserId,
973 revoke_grant_option: bool,
974 _cascade: bool,
975 ) -> Result<()> {
976 for user_id in users {
977 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
978 u.revoke_privileges(privileges.clone(), revoke_grant_option);
979 }
980 }
981 Ok(())
982 }
983
984 async fn alter_default_privilege(
985 &self,
986 _users: Vec<UserId>,
987 _database_id: DatabaseId,
988 _schemas: Vec<SchemaId>,
989 _operation: AlterDefaultPrivilegeOperation,
990 _operated_by: UserId,
991 ) -> Result<()> {
992 todo!()
993 }
994}
995
996impl MockUserInfoWriter {
997 pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
998 user_info.write().create_user(UserInfo {
999 id: DEFAULT_SUPER_USER_ID,
1000 name: DEFAULT_SUPER_USER.to_owned(),
1001 is_super: true,
1002 can_create_db: true,
1003 can_create_user: true,
1004 can_login: true,
1005 ..Default::default()
1006 });
1007 Self {
1008 user_info,
1009 id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
1010 }
1011 }
1012
1013 fn gen_id(&self) -> u32 {
1014 self.id.fetch_add(1, Ordering::SeqCst)
1015 }
1016}
1017
1018pub struct MockFrontendMetaClient {}
1019
1020#[async_trait::async_trait]
1021impl FrontendMetaClient for MockFrontendMetaClient {
1022 async fn try_unregister(&self) {}
1023
1024 async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1025 Ok(INVALID_VERSION_ID)
1026 }
1027
1028 async fn wait(&self) -> RpcResult<()> {
1029 Ok(())
1030 }
1031
1032 async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1033 Ok(vec![])
1034 }
1035
1036 async fn list_table_fragments(
1037 &self,
1038 _table_ids: &[u32],
1039 ) -> RpcResult<HashMap<u32, TableFragmentInfo>> {
1040 Ok(HashMap::default())
1041 }
1042
1043 async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1044 Ok(vec![])
1045 }
1046
1047 async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1048 Ok(vec![])
1049 }
1050
1051 async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1052 Ok(vec![])
1053 }
1054
1055 async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1056 Ok(vec![])
1057 }
1058
1059 async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1060 Ok(vec![])
1061 }
1062
1063 async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1064 Ok(vec![])
1065 }
1066
1067 async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1068 Ok(vec![])
1069 }
1070
1071 async fn set_system_param(
1072 &self,
1073 _param: String,
1074 _value: Option<String>,
1075 ) -> RpcResult<Option<SystemParamsReader>> {
1076 Ok(Some(SystemParams::default().into()))
1077 }
1078
1079 async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1080 Ok(Default::default())
1081 }
1082
1083 async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1084 Ok("".to_owned())
1085 }
1086
1087 async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1088 Ok(vec![])
1089 }
1090
1091 async fn get_tables(
1092 &self,
1093 _table_ids: &[u32],
1094 _include_dropped_tables: bool,
1095 ) -> RpcResult<HashMap<u32, Table>> {
1096 Ok(HashMap::new())
1097 }
1098
1099 async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(u32, u64)>> {
1100 unimplemented!()
1101 }
1102
1103 async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1104 Ok(HummockVersion::default())
1105 }
1106
1107 async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1108 unimplemented!()
1109 }
1110
1111 async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1112 unimplemented!()
1113 }
1114
1115 async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1116 unimplemented!()
1117 }
1118
1119 async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1120 unimplemented!()
1121 }
1122
1123 async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1124 unimplemented!()
1125 }
1126
1127 async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1128 unimplemented!()
1129 }
1130
1131 async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1132 unimplemented!()
1133 }
1134
1135 async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1136 unimplemented!()
1137 }
1138
1139 async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1140 Ok(vec![])
1141 }
1142
1143 async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1144 unimplemented!()
1145 }
1146
1147 async fn recover(&self) -> RpcResult<()> {
1148 unimplemented!()
1149 }
1150
1151 async fn apply_throttle(
1152 &self,
1153 _kind: PbThrottleTarget,
1154 _id: u32,
1155 _rate_limit: Option<u32>,
1156 ) -> RpcResult<()> {
1157 unimplemented!()
1158 }
1159
1160 async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1161 Ok(RecoveryStatus::StatusRunning)
1162 }
1163
1164 async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1165 Ok(vec![])
1166 }
1167
1168 async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1169 Ok(vec![])
1170 }
1171
1172 async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1173 unimplemented!()
1174 }
1175
1176 async fn alter_sink_props(
1177 &self,
1178 _sink_id: u32,
1179 _changed_props: BTreeMap<String, String>,
1180 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1181 _connector_conn_ref: Option<u32>,
1182 ) -> RpcResult<()> {
1183 unimplemented!()
1184 }
1185
1186 async fn alter_source_connector_props(
1187 &self,
1188 _source_id: u32,
1189 _changed_props: BTreeMap<String, String>,
1190 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1191 _connector_conn_ref: Option<u32>,
1192 ) -> RpcResult<()> {
1193 unimplemented!()
1194 }
1195
1196 async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1197 unimplemented!()
1198 }
1199
1200 async fn get_fragment_by_id(
1201 &self,
1202 _fragment_id: u32,
1203 ) -> RpcResult<Option<FragmentDistribution>> {
1204 unimplemented!()
1205 }
1206
1207 fn worker_id(&self) -> u32 {
1208 0
1209 }
1210
1211 async fn set_sync_log_store_aligned(&self, _job_id: u32, _aligned: bool) -> RpcResult<()> {
1212 Ok(())
1213 }
1214}
1215
1216#[cfg(test)]
1217pub static PROTO_FILE_DATA: &str = r#"
1218 syntax = "proto3";
1219 package test;
1220 message TestRecord {
1221 int32 id = 1;
1222 Country country = 3;
1223 int64 zipcode = 4;
1224 float rate = 5;
1225 }
1226 message TestRecordAlterType {
1227 string id = 1;
1228 Country country = 3;
1229 int32 zipcode = 4;
1230 float rate = 5;
1231 }
1232 message TestRecordExt {
1233 int32 id = 1;
1234 Country country = 3;
1235 int64 zipcode = 4;
1236 float rate = 5;
1237 string name = 6;
1238 }
1239 message Country {
1240 string address = 1;
1241 City city = 2;
1242 string zipcode = 3;
1243 }
1244 message City {
1245 string address = 1;
1246 string zipcode = 2;
1247 }"#;
1248
1249pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1252 let in_file = Builder::new()
1253 .prefix("temp")
1254 .suffix(".proto")
1255 .rand_bytes(8)
1256 .tempfile()
1257 .unwrap();
1258
1259 let out_file = Builder::new()
1260 .prefix("temp")
1261 .suffix(".pb")
1262 .rand_bytes(8)
1263 .tempfile()
1264 .unwrap();
1265
1266 let mut file = in_file.as_file();
1267 file.write_all(proto_data.as_ref())
1268 .expect("writing binary to test file");
1269 file.flush().expect("flush temp file failed");
1270 let include_path = in_file
1271 .path()
1272 .parent()
1273 .unwrap()
1274 .to_string_lossy()
1275 .into_owned();
1276 let out_path = out_file.path().to_string_lossy().into_owned();
1277 let in_path = in_file.path().to_string_lossy().into_owned();
1278 let mut compile = std::process::Command::new("protoc");
1279
1280 let out = compile
1281 .arg("--include_imports")
1282 .arg("-I")
1283 .arg(include_path)
1284 .arg(format!("--descriptor_set_out={}", out_path))
1285 .arg(in_path)
1286 .output()
1287 .expect("failed to compile proto");
1288 if !out.status.success() {
1289 panic!("compile proto failed \n output: {:?}", out);
1290 }
1291 out_file
1292}