1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28 AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29 DEFAULT_SUPER_USER_ID, FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId,
30 PG_CATALOG_SCHEMA_NAME, RW_CATALOG_SCHEMA_NAME, TableId,
31};
32use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
33use risingwave_common::session_config::SessionConfig;
34use risingwave_common::system_param::reader::SystemParamsReader;
35use risingwave_common::util::cluster_limit::ClusterLimit;
36use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
37use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
38use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
39use risingwave_pb::backup_service::MetaSnapshotMetadata;
40use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
41use risingwave_pb::catalog::{
42 PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
43 PbSubscription, PbTable, PbView, Table,
44};
45use risingwave_pb::common::WorkerNode;
46use risingwave_pb::ddl_service::alter_owner_request::Object;
47use risingwave_pb::ddl_service::{
48 DdlProgress, PbTableJobType, ReplaceJobPlan, TableJobType, alter_name_request,
49 alter_set_schema_request, alter_swap_rename_request, create_connection_request,
50};
51use risingwave_pb::hummock::write_limits::WriteLimit;
52use risingwave_pb::hummock::{
53 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
54};
55use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
56use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
57use risingwave_pb::meta::list_actor_states_response::ActorState;
58use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
59use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
60use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
61use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
62use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
63use risingwave_pb::meta::{
64 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
65 SystemParams,
66};
67use risingwave_pb::secret::PbSecretRef;
68use risingwave_pb::stream_plan::StreamFragmentGraph;
69use risingwave_pb::user::update_user_request::UpdateField;
70use risingwave_pb::user::{GrantPrivilege, UserInfo};
71use risingwave_rpc_client::error::Result as RpcResult;
72use tempfile::{Builder, NamedTempFile};
73
74use crate::FrontendOpts;
75use crate::catalog::catalog_service::CatalogWriter;
76use crate::catalog::root_catalog::Catalog;
77use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId};
78use crate::error::{ErrorCode, Result};
79use crate::handler::RwPgResponse;
80use crate::meta_client::FrontendMetaClient;
81use crate::scheduler::HummockSnapshotManagerRef;
82use crate::session::{AuthContext, FrontendEnv, SessionImpl};
83use crate::user::UserId;
84use crate::user::user_manager::UserInfoManager;
85use crate::user::user_service::UserInfoWriter;
86
87pub struct LocalFrontend {
89 pub opts: FrontendOpts,
90 env: FrontendEnv,
91}
92
93impl SessionManager for LocalFrontend {
94 type Session = SessionImpl;
95
96 fn create_dummy_session(
97 &self,
98 _database_id: u32,
99 _user_name: u32,
100 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
101 unreachable!()
102 }
103
104 fn connect(
105 &self,
106 _database: &str,
107 _user_name: &str,
108 _peer_addr: AddressRef,
109 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
110 Ok(self.session_ref())
111 }
112
113 fn cancel_queries_in_session(&self, _session_id: SessionId) {
114 unreachable!()
115 }
116
117 fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
118 unreachable!()
119 }
120
121 fn end_session(&self, _session: &Self::Session) {
122 unreachable!()
123 }
124}
125
126impl LocalFrontend {
127 #[expect(clippy::unused_async)]
128 pub async fn new(opts: FrontendOpts) -> Self {
129 let env = FrontendEnv::mock();
130 Self { opts, env }
131 }
132
133 pub async fn run_sql(
134 &self,
135 sql: impl Into<String>,
136 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
137 let sql: Arc<str> = Arc::from(sql.into());
138 self.session_ref().run_statement(sql, vec![]).await
139 }
140
141 pub async fn run_sql_with_session(
142 &self,
143 session_ref: Arc<SessionImpl>,
144 sql: impl Into<String>,
145 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
146 let sql: Arc<str> = Arc::from(sql.into());
147 session_ref.run_statement(sql, vec![]).await
148 }
149
150 pub async fn run_user_sql(
151 &self,
152 sql: impl Into<String>,
153 database: String,
154 user_name: String,
155 user_id: UserId,
156 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
157 let sql: Arc<str> = Arc::from(sql.into());
158 self.session_user_ref(database, user_name, user_id)
159 .run_statement(sql, vec![])
160 .await
161 }
162
163 pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
164 let mut rsp = self.run_sql(sql).await.unwrap();
165 let mut res = vec![];
166 #[for_await]
167 for row_set in rsp.values_stream() {
168 for row in row_set.unwrap() {
169 res.push(format!("{:?}", row));
170 }
171 }
172 res
173 }
174
175 pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
176 let mut rsp = self.run_sql(sql).await.unwrap();
177 assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
178 let mut res = String::new();
179 #[for_await]
180 for row_set in rsp.values_stream() {
181 for row in row_set.unwrap() {
182 let row: Row = row;
183 let row = row.values()[0].as_ref().unwrap();
184 res += std::str::from_utf8(row).unwrap();
185 res += "\n";
186 }
187 }
188 res
189 }
190
191 pub fn session_ref(&self) -> Arc<SessionImpl> {
193 self.session_user_ref(
194 DEFAULT_DATABASE_NAME.to_owned(),
195 DEFAULT_SUPER_USER.to_owned(),
196 DEFAULT_SUPER_USER_ID,
197 )
198 }
199
200 pub fn session_user_ref(
201 &self,
202 database: String,
203 user_name: String,
204 user_id: UserId,
205 ) -> Arc<SessionImpl> {
206 Arc::new(SessionImpl::new(
207 self.env.clone(),
208 AuthContext::new(database, user_name, user_id),
209 UserAuthenticator::None,
210 (0, 0),
212 Address::Tcp(SocketAddr::new(
213 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
214 6666,
215 ))
216 .into(),
217 Default::default(),
218 ))
219 }
220}
221
222pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
223 if rsp.stmt_type() != StatementType::EXPLAIN {
224 panic!("RESPONSE INVALID: {rsp:?}");
225 }
226 let mut res = String::new();
227 #[for_await]
228 for row_set in rsp.values_stream() {
229 for row in row_set.unwrap() {
230 let row: Row = row;
231 let row = row.values()[0].as_ref().unwrap();
232 res += std::str::from_utf8(row).unwrap();
233 res += "\n";
234 }
235 }
236 res
237}
238
239pub struct MockCatalogWriter {
240 catalog: Arc<RwLock<Catalog>>,
241 id: AtomicU32,
242 table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
243 schema_id_to_database_id: RwLock<HashMap<u32, DatabaseId>>,
244 hummock_snapshot_manager: HummockSnapshotManagerRef,
245}
246
247#[async_trait::async_trait]
248impl CatalogWriter for MockCatalogWriter {
249 async fn create_database(
250 &self,
251 db_name: &str,
252 owner: UserId,
253 resource_group: &str,
254 barrier_interval_ms: Option<u32>,
255 checkpoint_frequency: Option<u64>,
256 ) -> Result<()> {
257 let database_id = self.gen_id();
258 self.catalog.write().create_database(&PbDatabase {
259 name: db_name.to_owned(),
260 id: database_id,
261 owner,
262 resource_group: resource_group.to_owned(),
263 barrier_interval_ms,
264 checkpoint_frequency,
265 });
266 self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
267 .await?;
268 self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
269 .await?;
270 self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
271 .await?;
272 Ok(())
273 }
274
275 async fn create_schema(
276 &self,
277 db_id: DatabaseId,
278 schema_name: &str,
279 owner: UserId,
280 ) -> Result<()> {
281 let id = self.gen_id();
282 self.catalog.write().create_schema(&PbSchema {
283 id,
284 name: schema_name.to_owned(),
285 database_id: db_id,
286 owner,
287 });
288 self.add_schema_id(id, db_id);
289 Ok(())
290 }
291
292 async fn create_materialized_view(
293 &self,
294 mut table: PbTable,
295 _graph: StreamFragmentGraph,
296 _dependencies: HashSet<ObjectId>,
297 _specific_resource_group: Option<String>,
298 _if_not_exists: bool,
299 ) -> Result<()> {
300 table.id = self.gen_id();
301 table.stream_job_status = PbStreamJobStatus::Created as _;
302 table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
303 self.catalog.write().create_table(&table);
304 self.add_table_or_source_id(table.id, table.schema_id, table.database_id);
305 self.hummock_snapshot_manager
306 .add_table_for_test(TableId::new(table.id));
307 Ok(())
308 }
309
310 async fn create_view(&self, mut view: PbView) -> Result<()> {
311 view.id = self.gen_id();
312 self.catalog.write().create_view(&view);
313 self.add_table_or_source_id(view.id, view.schema_id, view.database_id);
314 Ok(())
315 }
316
317 async fn create_table(
318 &self,
319 source: Option<PbSource>,
320 mut table: PbTable,
321 graph: StreamFragmentGraph,
322 _job_type: PbTableJobType,
323 if_not_exists: bool,
324 ) -> Result<()> {
325 if let Some(source) = source {
326 let source_id = self.create_source_inner(source)?;
327 table.optional_associated_source_id =
328 Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
329 }
330 self.create_materialized_view(table, graph, HashSet::new(), None, if_not_exists)
331 .await?;
332 Ok(())
333 }
334
335 async fn replace_table(
336 &self,
337 _source: Option<PbSource>,
338 mut table: PbTable,
339 _graph: StreamFragmentGraph,
340 _job_type: TableJobType,
341 ) -> Result<()> {
342 table.stream_job_status = PbStreamJobStatus::Created as _;
343 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
344 self.catalog.write().update_table(&table);
345 Ok(())
346 }
347
348 async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
349 self.catalog.write().update_source(&source);
350 Ok(())
351 }
352
353 async fn create_source(
354 &self,
355 source: PbSource,
356 _graph: Option<StreamFragmentGraph>,
357 _if_not_exists: bool,
358 ) -> Result<()> {
359 self.create_source_inner(source).map(|_| ())
360 }
361
362 async fn create_sink(
363 &self,
364 sink: PbSink,
365 graph: StreamFragmentGraph,
366 _affected_table_change: Option<ReplaceJobPlan>,
367 _dependencies: HashSet<ObjectId>,
368 _if_not_exists: bool,
369 ) -> Result<()> {
370 self.create_sink_inner(sink, graph)
371 }
372
373 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
374 self.create_subscription_inner(subscription)
375 }
376
377 async fn create_index(
378 &self,
379 mut index: PbIndex,
380 mut index_table: PbTable,
381 _graph: StreamFragmentGraph,
382 _if_not_exists: bool,
383 ) -> Result<()> {
384 index_table.id = self.gen_id();
385 index_table.stream_job_status = PbStreamJobStatus::Created as _;
386 index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
387 self.catalog.write().create_table(&index_table);
388 self.add_table_or_index_id(
389 index_table.id,
390 index_table.schema_id,
391 index_table.database_id,
392 );
393
394 index.id = index_table.id;
395 index.index_table_id = index_table.id;
396 self.catalog.write().create_index(&index);
397 Ok(())
398 }
399
400 async fn create_function(&self, _function: PbFunction) -> Result<()> {
401 unreachable!()
402 }
403
404 async fn create_connection(
405 &self,
406 _connection_name: String,
407 _database_id: u32,
408 _schema_id: u32,
409 _owner_id: u32,
410 _connection: create_connection_request::Payload,
411 ) -> Result<()> {
412 unreachable!()
413 }
414
415 async fn create_secret(
416 &self,
417 _secret_name: String,
418 _database_id: u32,
419 _schema_id: u32,
420 _owner_id: u32,
421 _payload: Vec<u8>,
422 ) -> Result<()> {
423 unreachable!()
424 }
425
426 async fn comment_on(&self, _comment: PbComment) -> Result<()> {
427 unreachable!()
428 }
429
430 async fn drop_table(
431 &self,
432 source_id: Option<u32>,
433 table_id: TableId,
434 cascade: bool,
435 ) -> Result<()> {
436 if cascade {
437 return Err(ErrorCode::NotSupported(
438 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
439 "use drop instead".to_owned(),
440 )
441 .into());
442 }
443 if let Some(source_id) = source_id {
444 self.drop_table_or_source_id(source_id);
445 }
446 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
447 let indexes =
448 self.catalog
449 .read()
450 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
451 for index in indexes {
452 self.drop_index(index.id, cascade).await?;
453 }
454 self.catalog
455 .write()
456 .drop_table(database_id, schema_id, table_id);
457 if let Some(source_id) = source_id {
458 self.catalog
459 .write()
460 .drop_source(database_id, schema_id, source_id);
461 }
462 Ok(())
463 }
464
465 async fn drop_view(&self, _view_id: u32, _cascade: bool) -> Result<()> {
466 unreachable!()
467 }
468
469 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
470 if cascade {
471 return Err(ErrorCode::NotSupported(
472 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
473 "use drop instead".to_owned(),
474 )
475 .into());
476 }
477 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
478 let indexes =
479 self.catalog
480 .read()
481 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
482 for index in indexes {
483 self.drop_index(index.id, cascade).await?;
484 }
485 self.catalog
486 .write()
487 .drop_table(database_id, schema_id, table_id);
488 Ok(())
489 }
490
491 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
492 if cascade {
493 return Err(ErrorCode::NotSupported(
494 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
495 "use drop instead".to_owned(),
496 )
497 .into());
498 }
499 let (database_id, schema_id) = self.drop_table_or_source_id(source_id);
500 self.catalog
501 .write()
502 .drop_source(database_id, schema_id, source_id);
503 Ok(())
504 }
505
506 async fn drop_sink(
507 &self,
508 sink_id: u32,
509 cascade: bool,
510 _target_table_change: Option<ReplaceJobPlan>,
511 ) -> Result<()> {
512 if cascade {
513 return Err(ErrorCode::NotSupported(
514 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
515 "use drop instead".to_owned(),
516 )
517 .into());
518 }
519 let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id);
520 self.catalog
521 .write()
522 .drop_sink(database_id, schema_id, sink_id);
523 Ok(())
524 }
525
526 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
527 if cascade {
528 return Err(ErrorCode::NotSupported(
529 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
530 "use drop instead".to_owned(),
531 )
532 .into());
533 }
534 let (database_id, schema_id) = self.drop_table_or_subscription_id(subscription_id);
535 self.catalog
536 .write()
537 .drop_subscription(database_id, schema_id, subscription_id);
538 Ok(())
539 }
540
541 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
542 if cascade {
543 return Err(ErrorCode::NotSupported(
544 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
545 "use drop instead".to_owned(),
546 )
547 .into());
548 }
549 let &schema_id = self
550 .table_id_to_schema_id
551 .read()
552 .get(&index_id.index_id)
553 .unwrap();
554 let database_id = self.get_database_id_by_schema(schema_id);
555
556 let index = {
557 let catalog_reader = self.catalog.read();
558 let schema_catalog = catalog_reader
559 .get_schema_by_id(&database_id, &schema_id)
560 .unwrap();
561 schema_catalog.get_index_by_id(&index_id).unwrap().clone()
562 };
563
564 let index_table_id = index.index_table.id;
565 let (database_id, schema_id) = self.drop_table_or_index_id(index_id.index_id);
566 self.catalog
567 .write()
568 .drop_index(database_id, schema_id, index_id);
569 self.catalog
570 .write()
571 .drop_table(database_id, schema_id, index_table_id);
572 Ok(())
573 }
574
575 async fn drop_function(&self, _function_id: FunctionId) -> Result<()> {
576 unreachable!()
577 }
578
579 async fn drop_connection(&self, _connection_id: ConnectionId) -> Result<()> {
580 unreachable!()
581 }
582
583 async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
584 unreachable!()
585 }
586
587 async fn drop_database(&self, database_id: u32) -> Result<()> {
588 self.catalog.write().drop_database(database_id);
589 Ok(())
590 }
591
592 async fn drop_schema(&self, schema_id: u32, _cascade: bool) -> Result<()> {
593 let database_id = self.drop_schema_id(schema_id);
594 self.catalog.write().drop_schema(database_id, schema_id);
595 Ok(())
596 }
597
598 async fn alter_name(
599 &self,
600 object_id: alter_name_request::Object,
601 object_name: &str,
602 ) -> Result<()> {
603 match object_id {
604 alter_name_request::Object::TableId(table_id) => {
605 self.catalog
606 .write()
607 .alter_table_name_by_id(&table_id.into(), object_name);
608 Ok(())
609 }
610 _ => {
611 unimplemented!()
612 }
613 }
614 }
615
616 async fn alter_source(&self, source: PbSource) -> Result<()> {
617 self.catalog.write().update_source(&source);
618 Ok(())
619 }
620
621 async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
622 for database in self.catalog.read().iter_databases() {
623 for schema in database.iter_schemas() {
624 match object {
625 Object::TableId(table_id) => {
626 if let Some(table) =
627 schema.get_created_table_by_id(&TableId::from(table_id))
628 {
629 let mut pb_table = table.to_prost();
630 pb_table.owner = owner_id;
631 self.catalog.write().update_table(&pb_table);
632 return Ok(());
633 }
634 }
635 _ => unreachable!(),
636 }
637 }
638 }
639
640 Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
641 }
642
643 async fn alter_set_schema(
644 &self,
645 object: alter_set_schema_request::Object,
646 new_schema_id: u32,
647 ) -> Result<()> {
648 match object {
649 alter_set_schema_request::Object::TableId(table_id) => {
650 let mut pb_table = {
651 let reader = self.catalog.read();
652 let table = reader.get_any_table_by_id(&table_id.into())?.to_owned();
653 table.to_prost()
654 };
655 pb_table.schema_id = new_schema_id;
656 self.catalog.write().update_table(&pb_table);
657 self.table_id_to_schema_id
658 .write()
659 .insert(table_id, new_schema_id);
660 Ok(())
661 }
662 _ => unreachable!(),
663 }
664 }
665
666 async fn alter_parallelism(
667 &self,
668 _table_id: u32,
669 _parallelism: PbTableParallelism,
670 _deferred: bool,
671 ) -> Result<()> {
672 todo!()
673 }
674
675 async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
676 todo!()
677 }
678
679 async fn alter_secret(
680 &self,
681 _secret_id: u32,
682 _secret_name: String,
683 _database_id: u32,
684 _schema_id: u32,
685 _owner_id: u32,
686 _payload: Vec<u8>,
687 ) -> Result<()> {
688 unreachable!()
689 }
690
691 async fn alter_resource_group(
692 &self,
693 _table_id: u32,
694 _resource_group: Option<String>,
695 _deferred: bool,
696 ) -> Result<()> {
697 todo!()
698 }
699
700 async fn alter_database_param(
701 &self,
702 database_id: u32,
703 param: AlterDatabaseParam,
704 ) -> Result<()> {
705 let mut pb_database = {
706 let reader = self.catalog.read();
707 let database = reader.get_database_by_id(&database_id)?.to_owned();
708 database.to_prost()
709 };
710 match param {
711 AlterDatabaseParam::BarrierIntervalMs(interval) => {
712 pb_database.barrier_interval_ms = interval;
713 }
714 AlterDatabaseParam::CheckpointFrequency(frequency) => {
715 pb_database.checkpoint_frequency = frequency;
716 }
717 }
718 self.catalog.write().update_database(&pb_database);
719 Ok(())
720 }
721}
722
723impl MockCatalogWriter {
724 pub fn new(
725 catalog: Arc<RwLock<Catalog>>,
726 hummock_snapshot_manager: HummockSnapshotManagerRef,
727 ) -> Self {
728 catalog.write().create_database(&PbDatabase {
729 id: 0,
730 name: DEFAULT_DATABASE_NAME.to_owned(),
731 owner: DEFAULT_SUPER_USER_ID,
732 resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
733 barrier_interval_ms: None,
734 checkpoint_frequency: None,
735 });
736 catalog.write().create_schema(&PbSchema {
737 id: 1,
738 name: DEFAULT_SCHEMA_NAME.to_owned(),
739 database_id: 0,
740 owner: DEFAULT_SUPER_USER_ID,
741 });
742 catalog.write().create_schema(&PbSchema {
743 id: 2,
744 name: PG_CATALOG_SCHEMA_NAME.to_owned(),
745 database_id: 0,
746 owner: DEFAULT_SUPER_USER_ID,
747 });
748 catalog.write().create_schema(&PbSchema {
749 id: 3,
750 name: RW_CATALOG_SCHEMA_NAME.to_owned(),
751 database_id: 0,
752 owner: DEFAULT_SUPER_USER_ID,
753 });
754 let mut map: HashMap<u32, DatabaseId> = HashMap::new();
755 map.insert(1_u32, 0_u32);
756 map.insert(2_u32, 0_u32);
757 map.insert(3_u32, 0_u32);
758 Self {
759 catalog,
760 id: AtomicU32::new(3),
761 table_id_to_schema_id: Default::default(),
762 schema_id_to_database_id: RwLock::new(map),
763 hummock_snapshot_manager,
764 }
765 }
766
767 fn gen_id(&self) -> u32 {
768 self.id.fetch_add(1, Ordering::SeqCst) + 1
770 }
771
772 fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
773 self.table_id_to_schema_id
774 .write()
775 .insert(table_id, schema_id);
776 }
777
778 fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
779 let schema_id = self
780 .table_id_to_schema_id
781 .write()
782 .remove(&table_id)
783 .unwrap();
784 (self.get_database_id_by_schema(schema_id), schema_id)
785 }
786
787 fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
788 self.table_id_to_schema_id
789 .write()
790 .insert(table_id, schema_id);
791 }
792
793 fn add_table_or_subscription_id(
794 &self,
795 table_id: u32,
796 schema_id: SchemaId,
797 _database_id: DatabaseId,
798 ) {
799 self.table_id_to_schema_id
800 .write()
801 .insert(table_id, schema_id);
802 }
803
804 fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
805 self.table_id_to_schema_id
806 .write()
807 .insert(table_id, schema_id);
808 }
809
810 fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
811 let schema_id = self
812 .table_id_to_schema_id
813 .write()
814 .remove(&table_id)
815 .unwrap();
816 (self.get_database_id_by_schema(schema_id), schema_id)
817 }
818
819 fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
820 let schema_id = self
821 .table_id_to_schema_id
822 .write()
823 .remove(&table_id)
824 .unwrap();
825 (self.get_database_id_by_schema(schema_id), schema_id)
826 }
827
828 fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
829 let schema_id = self
830 .table_id_to_schema_id
831 .write()
832 .remove(&table_id)
833 .unwrap();
834 (self.get_database_id_by_schema(schema_id), schema_id)
835 }
836
837 fn add_schema_id(&self, schema_id: u32, database_id: DatabaseId) {
838 self.schema_id_to_database_id
839 .write()
840 .insert(schema_id, database_id);
841 }
842
843 fn drop_schema_id(&self, schema_id: u32) -> DatabaseId {
844 self.schema_id_to_database_id
845 .write()
846 .remove(&schema_id)
847 .unwrap()
848 }
849
850 fn create_source_inner(&self, mut source: PbSource) -> Result<u32> {
851 source.id = self.gen_id();
852 self.catalog.write().create_source(&source);
853 self.add_table_or_source_id(source.id, source.schema_id, source.database_id);
854 Ok(source.id)
855 }
856
857 fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
858 sink.id = self.gen_id();
859 self.catalog.write().create_sink(&sink);
860 self.add_table_or_sink_id(sink.id, sink.schema_id, sink.database_id);
861 Ok(())
862 }
863
864 fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
865 subscription.id = self.gen_id();
866 self.catalog.write().create_subscription(&subscription);
867 self.add_table_or_subscription_id(
868 subscription.id,
869 subscription.schema_id,
870 subscription.database_id,
871 );
872 Ok(())
873 }
874
875 fn get_database_id_by_schema(&self, schema_id: u32) -> DatabaseId {
876 *self
877 .schema_id_to_database_id
878 .read()
879 .get(&schema_id)
880 .unwrap()
881 }
882}
883
884pub struct MockUserInfoWriter {
885 id: AtomicU32,
886 user_info: Arc<RwLock<UserInfoManager>>,
887}
888
889#[async_trait::async_trait]
890impl UserInfoWriter for MockUserInfoWriter {
891 async fn create_user(&self, user: UserInfo) -> Result<()> {
892 let mut user = user;
893 user.id = self.gen_id();
894 self.user_info.write().create_user(user);
895 Ok(())
896 }
897
898 async fn drop_user(&self, id: UserId) -> Result<()> {
899 self.user_info.write().drop_user(id);
900 Ok(())
901 }
902
903 async fn update_user(
904 &self,
905 update_user: UserInfo,
906 update_fields: Vec<UpdateField>,
907 ) -> Result<()> {
908 let mut lock = self.user_info.write();
909 let id = update_user.get_id();
910 let Some(old_name) = lock.get_user_name_by_id(id) else {
911 return Ok(());
912 };
913 let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
914 update_fields.into_iter().for_each(|field| match field {
915 UpdateField::Super => user_info.is_super = update_user.is_super,
916 UpdateField::Login => user_info.can_login = update_user.can_login,
917 UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
918 UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
919 UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
920 UpdateField::Rename => user_info.name.clone_from(&update_user.name),
921 UpdateField::Unspecified => unreachable!(),
922 });
923 lock.update_user(update_user);
924 Ok(())
925 }
926
927 async fn grant_privilege(
930 &self,
931 users: Vec<UserId>,
932 privileges: Vec<GrantPrivilege>,
933 with_grant_option: bool,
934 _grantor: UserId,
935 ) -> Result<()> {
936 let privileges = privileges
937 .into_iter()
938 .map(|mut p| {
939 p.action_with_opts
940 .iter_mut()
941 .for_each(|ao| ao.with_grant_option = with_grant_option);
942 p
943 })
944 .collect::<Vec<_>>();
945 for user_id in users {
946 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
947 u.extend_privileges(privileges.clone());
948 }
949 }
950 Ok(())
951 }
952
953 async fn revoke_privilege(
956 &self,
957 users: Vec<UserId>,
958 privileges: Vec<GrantPrivilege>,
959 _granted_by: UserId,
960 _revoke_by: UserId,
961 revoke_grant_option: bool,
962 _cascade: bool,
963 ) -> Result<()> {
964 for user_id in users {
965 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
966 u.revoke_privileges(privileges.clone(), revoke_grant_option);
967 }
968 }
969 Ok(())
970 }
971}
972
973impl MockUserInfoWriter {
974 pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
975 user_info.write().create_user(UserInfo {
976 id: DEFAULT_SUPER_USER_ID,
977 name: DEFAULT_SUPER_USER.to_owned(),
978 is_super: true,
979 can_create_db: true,
980 can_create_user: true,
981 can_login: true,
982 ..Default::default()
983 });
984 Self {
985 user_info,
986 id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
987 }
988 }
989
990 fn gen_id(&self) -> u32 {
991 self.id.fetch_add(1, Ordering::SeqCst)
992 }
993}
994
995pub struct MockFrontendMetaClient {}
996
997#[async_trait::async_trait]
998impl FrontendMetaClient for MockFrontendMetaClient {
999 async fn try_unregister(&self) {}
1000
1001 async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1002 Ok(INVALID_VERSION_ID)
1003 }
1004
1005 async fn wait(&self) -> RpcResult<()> {
1006 Ok(())
1007 }
1008
1009 async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1010 Ok(vec![])
1011 }
1012
1013 async fn list_table_fragments(
1014 &self,
1015 _table_ids: &[u32],
1016 ) -> RpcResult<HashMap<u32, TableFragmentInfo>> {
1017 Ok(HashMap::default())
1018 }
1019
1020 async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1021 Ok(vec![])
1022 }
1023
1024 async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1025 Ok(vec![])
1026 }
1027
1028 async fn list_creating_stream_scan_fragment_distribution(
1029 &self,
1030 ) -> RpcResult<Vec<FragmentDistribution>> {
1031 Ok(vec![])
1032 }
1033
1034 async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1035 Ok(vec![])
1036 }
1037
1038 async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1039 Ok(vec![])
1040 }
1041
1042 async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1043 Ok(vec![])
1044 }
1045
1046 async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1047 Ok(vec![])
1048 }
1049
1050 async fn get_system_params(&self) -> RpcResult<SystemParamsReader> {
1051 Ok(SystemParams::default().into())
1052 }
1053
1054 async fn set_system_param(
1055 &self,
1056 _param: String,
1057 _value: Option<String>,
1058 ) -> RpcResult<Option<SystemParamsReader>> {
1059 Ok(Some(SystemParams::default().into()))
1060 }
1061
1062 async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1063 Ok(Default::default())
1064 }
1065
1066 async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1067 Ok("".to_owned())
1068 }
1069
1070 async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1071 Ok(vec![])
1072 }
1073
1074 async fn get_tables(
1075 &self,
1076 _table_ids: &[u32],
1077 _include_dropped_tables: bool,
1078 ) -> RpcResult<HashMap<u32, Table>> {
1079 Ok(HashMap::new())
1080 }
1081
1082 async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(u32, u64)>> {
1083 unimplemented!()
1084 }
1085
1086 async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1087 Ok(HummockVersion::default())
1088 }
1089
1090 async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1091 unimplemented!()
1092 }
1093
1094 async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1095 unimplemented!()
1096 }
1097
1098 async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1099 unimplemented!()
1100 }
1101
1102 async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1103 unimplemented!()
1104 }
1105
1106 async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1107 unimplemented!()
1108 }
1109
1110 async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1111 unimplemented!()
1112 }
1113
1114 async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1115 unimplemented!()
1116 }
1117
1118 async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1119 unimplemented!()
1120 }
1121
1122 async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1123 Ok(vec![])
1124 }
1125
1126 async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1127 unimplemented!()
1128 }
1129
1130 async fn recover(&self) -> RpcResult<()> {
1131 unimplemented!()
1132 }
1133
1134 async fn apply_throttle(
1135 &self,
1136 _kind: PbThrottleTarget,
1137 _id: u32,
1138 _rate_limit: Option<u32>,
1139 ) -> RpcResult<()> {
1140 unimplemented!()
1141 }
1142
1143 async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1144 Ok(RecoveryStatus::StatusRunning)
1145 }
1146
1147 async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1148 Ok(vec![])
1149 }
1150
1151 async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1152 Ok(vec![])
1153 }
1154
1155 async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1156 unimplemented!()
1157 }
1158
1159 async fn alter_sink_props(
1160 &self,
1161 _sink_id: u32,
1162 _changed_props: BTreeMap<String, String>,
1163 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1164 _connector_conn_ref: Option<u32>,
1165 ) -> RpcResult<()> {
1166 unimplemented!()
1167 }
1168
1169 async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1170 unimplemented!()
1171 }
1172
1173 async fn get_fragment_by_id(
1174 &self,
1175 _fragment_id: u32,
1176 ) -> RpcResult<Option<FragmentDistribution>> {
1177 unimplemented!()
1178 }
1179
1180 fn worker_id(&self) -> u32 {
1181 0
1182 }
1183}
1184
1185#[cfg(test)]
1186pub static PROTO_FILE_DATA: &str = r#"
1187 syntax = "proto3";
1188 package test;
1189 message TestRecord {
1190 int32 id = 1;
1191 Country country = 3;
1192 int64 zipcode = 4;
1193 float rate = 5;
1194 }
1195 message TestRecordAlterType {
1196 string id = 1;
1197 Country country = 3;
1198 int32 zipcode = 4;
1199 float rate = 5;
1200 }
1201 message TestRecordExt {
1202 int32 id = 1;
1203 Country country = 3;
1204 int64 zipcode = 4;
1205 float rate = 5;
1206 string name = 6;
1207 }
1208 message Country {
1209 string address = 1;
1210 City city = 2;
1211 string zipcode = 3;
1212 }
1213 message City {
1214 string address = 1;
1215 string zipcode = 2;
1216 }"#;
1217
1218pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1221 let in_file = Builder::new()
1222 .prefix("temp")
1223 .suffix(".proto")
1224 .rand_bytes(8)
1225 .tempfile()
1226 .unwrap();
1227
1228 let out_file = Builder::new()
1229 .prefix("temp")
1230 .suffix(".pb")
1231 .rand_bytes(8)
1232 .tempfile()
1233 .unwrap();
1234
1235 let mut file = in_file.as_file();
1236 file.write_all(proto_data.as_ref())
1237 .expect("writing binary to test file");
1238 file.flush().expect("flush temp file failed");
1239 let include_path = in_file
1240 .path()
1241 .parent()
1242 .unwrap()
1243 .to_string_lossy()
1244 .into_owned();
1245 let out_path = out_file.path().to_string_lossy().into_owned();
1246 let in_path = in_file.path().to_string_lossy().into_owned();
1247 let mut compile = std::process::Command::new("protoc");
1248
1249 let out = compile
1250 .arg("--include_imports")
1251 .arg("-I")
1252 .arg(include_path)
1253 .arg(format!("--descriptor_set_out={}", out_path))
1254 .arg(in_path)
1255 .output()
1256 .expect("failed to compile proto");
1257 if !out.status.success() {
1258 panic!("compile proto failed \n output: {:?}", out);
1259 }
1260 out_file
1261}