1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28 AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29 DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30 FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31 RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::util::cluster_limit::ClusterLimit;
38use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
39use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
40use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
41use risingwave_pb::backup_service::MetaSnapshotMetadata;
42use risingwave_pb::catalog::{
43 PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44 PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
49use risingwave_pb::ddl_service::{
50 DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
51 alter_swap_rename_request, create_connection_request,
52};
53use risingwave_pb::hummock::write_limits::WriteLimit;
54use risingwave_pb::hummock::{
55 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
56};
57use risingwave_pb::id::ActorId;
58use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
59use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
60use risingwave_pb::meta::list_actor_states_response::ActorState;
61use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
62use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
63use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
64use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
65use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
66use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
67use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
68use risingwave_pb::meta::{
69 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
70 RefreshRequest, RefreshResponse, SystemParams,
71};
72use risingwave_pb::secret::PbSecretRef;
73use risingwave_pb::stream_plan::StreamFragmentGraph;
74use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
75use risingwave_pb::user::update_user_request::UpdateField;
76use risingwave_pb::user::{GrantPrivilege, UserInfo};
77use risingwave_rpc_client::error::Result as RpcResult;
78use tempfile::{Builder, NamedTempFile};
79
80use crate::FrontendOpts;
81use crate::catalog::catalog_service::CatalogWriter;
82use crate::catalog::root_catalog::Catalog;
83use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
84use crate::error::{ErrorCode, Result};
85use crate::handler::RwPgResponse;
86use crate::meta_client::FrontendMetaClient;
87use crate::scheduler::HummockSnapshotManagerRef;
88use crate::session::{AuthContext, FrontendEnv, SessionImpl};
89use crate::user::UserId;
90use crate::user::user_manager::UserInfoManager;
91use crate::user::user_service::UserInfoWriter;
92
93pub struct LocalFrontend {
95 pub opts: FrontendOpts,
96 env: FrontendEnv,
97}
98
99impl SessionManager for LocalFrontend {
100 type Session = SessionImpl;
101
102 fn create_dummy_session(
103 &self,
104 _database_id: DatabaseId,
105 _user_name: u32,
106 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
107 unreachable!()
108 }
109
110 fn connect(
111 &self,
112 _database: &str,
113 _user_name: &str,
114 _peer_addr: AddressRef,
115 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
116 Ok(self.session_ref())
117 }
118
119 fn cancel_queries_in_session(&self, _session_id: SessionId) {
120 unreachable!()
121 }
122
123 fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
124 unreachable!()
125 }
126
127 fn end_session(&self, _session: &Self::Session) {
128 unreachable!()
129 }
130}
131
132impl LocalFrontend {
133 #[expect(clippy::unused_async)]
134 pub async fn new(opts: FrontendOpts) -> Self {
135 let env = FrontendEnv::mock();
136 Self { opts, env }
137 }
138
139 pub async fn run_sql(
140 &self,
141 sql: impl Into<String>,
142 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
143 let sql: Arc<str> = Arc::from(sql.into());
144 self.session_ref().run_statement(sql, vec![]).await
145 }
146
147 pub async fn run_sql_with_session(
148 &self,
149 session_ref: Arc<SessionImpl>,
150 sql: impl Into<String>,
151 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
152 let sql: Arc<str> = Arc::from(sql.into());
153 session_ref.run_statement(sql, vec![]).await
154 }
155
156 pub async fn run_user_sql(
157 &self,
158 sql: impl Into<String>,
159 database: String,
160 user_name: String,
161 user_id: UserId,
162 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
163 let sql: Arc<str> = Arc::from(sql.into());
164 self.session_user_ref(database, user_name, user_id)
165 .run_statement(sql, vec![])
166 .await
167 }
168
169 pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
170 let mut rsp = self.run_sql(sql).await.unwrap();
171 let mut res = vec![];
172 #[for_await]
173 for row_set in rsp.values_stream() {
174 for row in row_set.unwrap() {
175 res.push(format!("{:?}", row));
176 }
177 }
178 res
179 }
180
181 pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
182 let mut rsp = self.run_sql(sql).await.unwrap();
183 assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
184 let mut res = String::new();
185 #[for_await]
186 for row_set in rsp.values_stream() {
187 for row in row_set.unwrap() {
188 let row: Row = row;
189 let row = row.values()[0].as_ref().unwrap();
190 res += std::str::from_utf8(row).unwrap();
191 res += "\n";
192 }
193 }
194 res
195 }
196
197 pub fn session_ref(&self) -> Arc<SessionImpl> {
199 self.session_user_ref(
200 DEFAULT_DATABASE_NAME.to_owned(),
201 DEFAULT_SUPER_USER.to_owned(),
202 DEFAULT_SUPER_USER_ID,
203 )
204 }
205
206 pub fn session_user_ref(
207 &self,
208 database: String,
209 user_name: String,
210 user_id: UserId,
211 ) -> Arc<SessionImpl> {
212 Arc::new(SessionImpl::new(
213 self.env.clone(),
214 AuthContext::new(database, user_name, user_id),
215 UserAuthenticator::None,
216 (0, 0),
218 Address::Tcp(SocketAddr::new(
219 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
220 6666,
221 ))
222 .into(),
223 Default::default(),
224 ))
225 }
226}
227
228pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
229 if rsp.stmt_type() != StatementType::EXPLAIN {
230 panic!("RESPONSE INVALID: {rsp:?}");
231 }
232 let mut res = String::new();
233 #[for_await]
234 for row_set in rsp.values_stream() {
235 for row in row_set.unwrap() {
236 let row: Row = row;
237 let row = row.values()[0].as_ref().unwrap();
238 res += std::str::from_utf8(row).unwrap();
239 res += "\n";
240 }
241 }
242 res
243}
244
245pub struct MockCatalogWriter {
246 catalog: Arc<RwLock<Catalog>>,
247 id: AtomicU32,
248 table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
249 schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
250 hummock_snapshot_manager: HummockSnapshotManagerRef,
251}
252
253#[async_trait::async_trait]
254impl CatalogWriter for MockCatalogWriter {
255 async fn create_database(
256 &self,
257 db_name: &str,
258 owner: UserId,
259 resource_group: &str,
260 barrier_interval_ms: Option<u32>,
261 checkpoint_frequency: Option<u64>,
262 ) -> Result<()> {
263 let database_id = DatabaseId::new(self.gen_id());
264 self.catalog.write().create_database(&PbDatabase {
265 name: db_name.to_owned(),
266 id: database_id,
267 owner,
268 resource_group: resource_group.to_owned(),
269 barrier_interval_ms,
270 checkpoint_frequency,
271 });
272 self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
273 .await?;
274 self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
275 .await?;
276 self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
277 .await?;
278 Ok(())
279 }
280
281 async fn create_schema(
282 &self,
283 db_id: DatabaseId,
284 schema_name: &str,
285 owner: UserId,
286 ) -> Result<()> {
287 let id = self.gen_id();
288 self.catalog.write().create_schema(&PbSchema {
289 id,
290 name: schema_name.to_owned(),
291 database_id: db_id,
292 owner,
293 });
294 self.add_schema_id(id, db_id);
295 Ok(())
296 }
297
298 async fn create_materialized_view(
299 &self,
300 mut table: PbTable,
301 _graph: StreamFragmentGraph,
302 _dependencies: HashSet<ObjectId>,
303 _specific_resource_group: Option<String>,
304 _if_not_exists: bool,
305 ) -> Result<()> {
306 table.id = self.gen_id();
307 table.stream_job_status = PbStreamJobStatus::Created as _;
308 table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
309 self.catalog.write().create_table(&table);
310 self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
311 self.hummock_snapshot_manager.add_table_for_test(table.id);
312 Ok(())
313 }
314
315 async fn replace_materialized_view(
316 &self,
317 mut table: PbTable,
318 _graph: StreamFragmentGraph,
319 ) -> Result<()> {
320 table.stream_job_status = PbStreamJobStatus::Created as _;
321 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
322 self.catalog.write().update_table(&table);
323 Ok(())
324 }
325
326 async fn create_view(&self, mut view: PbView, _dependencies: HashSet<ObjectId>) -> Result<()> {
327 view.id = self.gen_id();
328 self.catalog.write().create_view(&view);
329 self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
330 Ok(())
331 }
332
333 async fn create_table(
334 &self,
335 source: Option<PbSource>,
336 mut table: PbTable,
337 graph: StreamFragmentGraph,
338 _job_type: PbTableJobType,
339 if_not_exists: bool,
340 _dependencies: HashSet<ObjectId>,
341 ) -> Result<()> {
342 if let Some(source) = source {
343 let source_id = self.create_source_inner(source)?;
344 table.optional_associated_source_id = Some(source_id.into());
345 }
346 self.create_materialized_view(table, graph, HashSet::new(), None, if_not_exists)
347 .await?;
348 Ok(())
349 }
350
351 async fn replace_table(
352 &self,
353 _source: Option<PbSource>,
354 mut table: PbTable,
355 _graph: StreamFragmentGraph,
356 _job_type: TableJobType,
357 ) -> Result<()> {
358 table.stream_job_status = PbStreamJobStatus::Created as _;
359 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
360 self.catalog.write().update_table(&table);
361 Ok(())
362 }
363
364 async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
365 self.catalog.write().update_source(&source);
366 Ok(())
367 }
368
369 async fn create_source(
370 &self,
371 source: PbSource,
372 _graph: Option<StreamFragmentGraph>,
373 _if_not_exists: bool,
374 ) -> Result<()> {
375 self.create_source_inner(source).map(|_| ())
376 }
377
378 async fn create_sink(
379 &self,
380 sink: PbSink,
381 graph: StreamFragmentGraph,
382 _dependencies: HashSet<ObjectId>,
383 _if_not_exists: bool,
384 ) -> Result<()> {
385 self.create_sink_inner(sink, graph)
386 }
387
388 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
389 self.create_subscription_inner(subscription)
390 }
391
392 async fn create_index(
393 &self,
394 mut index: PbIndex,
395 mut index_table: PbTable,
396 _graph: StreamFragmentGraph,
397 _if_not_exists: bool,
398 ) -> Result<()> {
399 index_table.id = self.gen_id();
400 index_table.stream_job_status = PbStreamJobStatus::Created as _;
401 index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
402 self.catalog.write().create_table(&index_table);
403 self.add_table_or_index_id(
404 index_table.id.as_raw_id(),
405 index_table.schema_id,
406 index_table.database_id,
407 );
408
409 index.id = index_table.id.as_raw_id().into();
410 index.index_table_id = index_table.id;
411 self.catalog.write().create_index(&index);
412 Ok(())
413 }
414
415 async fn create_function(&self, _function: PbFunction) -> Result<()> {
416 unreachable!()
417 }
418
419 async fn create_connection(
420 &self,
421 _connection_name: String,
422 _database_id: DatabaseId,
423 _schema_id: SchemaId,
424 _owner_id: u32,
425 _connection: create_connection_request::Payload,
426 ) -> Result<()> {
427 unreachable!()
428 }
429
430 async fn create_secret(
431 &self,
432 _secret_name: String,
433 _database_id: DatabaseId,
434 _schema_id: SchemaId,
435 _owner_id: u32,
436 _payload: Vec<u8>,
437 ) -> Result<()> {
438 unreachable!()
439 }
440
441 async fn comment_on(&self, _comment: PbComment) -> Result<()> {
442 unreachable!()
443 }
444
445 async fn drop_table(
446 &self,
447 source_id: Option<u32>,
448 table_id: TableId,
449 cascade: bool,
450 ) -> Result<()> {
451 if cascade {
452 return Err(ErrorCode::NotSupported(
453 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
454 "use drop instead".to_owned(),
455 )
456 .into());
457 }
458 if let Some(source_id) = source_id {
459 self.drop_table_or_source_id(source_id);
460 }
461 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
462 let indexes =
463 self.catalog
464 .read()
465 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
466 for index in indexes {
467 self.drop_index(index.id, cascade).await?;
468 }
469 self.catalog
470 .write()
471 .drop_table(database_id, schema_id, table_id);
472 if let Some(source_id) = source_id {
473 self.catalog
474 .write()
475 .drop_source(database_id, schema_id, source_id.into());
476 }
477 Ok(())
478 }
479
480 async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
481 unreachable!()
482 }
483
484 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
485 if cascade {
486 return Err(ErrorCode::NotSupported(
487 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
488 "use drop instead".to_owned(),
489 )
490 .into());
491 }
492 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
493 let indexes =
494 self.catalog
495 .read()
496 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
497 for index in indexes {
498 self.drop_index(index.id, cascade).await?;
499 }
500 self.catalog
501 .write()
502 .drop_table(database_id, schema_id, table_id);
503 Ok(())
504 }
505
506 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
507 if cascade {
508 return Err(ErrorCode::NotSupported(
509 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
510 "use drop instead".to_owned(),
511 )
512 .into());
513 }
514 let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
515 self.catalog
516 .write()
517 .drop_source(database_id, schema_id, source_id);
518 Ok(())
519 }
520
521 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
522 if cascade {
523 return Err(ErrorCode::NotSupported(
524 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
525 "use drop instead".to_owned(),
526 )
527 .into());
528 }
529 let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
530 self.catalog
531 .write()
532 .drop_sink(database_id, schema_id, sink_id);
533 Ok(())
534 }
535
536 async fn drop_subscription(
537 &self,
538 subscription_id: SubscriptionId,
539 cascade: bool,
540 ) -> Result<()> {
541 if cascade {
542 return Err(ErrorCode::NotSupported(
543 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
544 "use drop instead".to_owned(),
545 )
546 .into());
547 }
548 let (database_id, schema_id) =
549 self.drop_table_or_subscription_id(subscription_id.as_raw_id());
550 self.catalog
551 .write()
552 .drop_subscription(database_id, schema_id, subscription_id);
553 Ok(())
554 }
555
556 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
557 if cascade {
558 return Err(ErrorCode::NotSupported(
559 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
560 "use drop instead".to_owned(),
561 )
562 .into());
563 }
564 let &schema_id = self
565 .table_id_to_schema_id
566 .read()
567 .get(&index_id.as_raw_id())
568 .unwrap();
569 let database_id = self.get_database_id_by_schema(schema_id);
570
571 let index = {
572 let catalog_reader = self.catalog.read();
573 let schema_catalog = catalog_reader
574 .get_schema_by_id(database_id, schema_id)
575 .unwrap();
576 schema_catalog.get_index_by_id(index_id).unwrap().clone()
577 };
578
579 let index_table_id = index.index_table().id;
580 let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
581 self.catalog
582 .write()
583 .drop_index(database_id, schema_id, index_id);
584 self.catalog
585 .write()
586 .drop_table(database_id, schema_id, index_table_id);
587 Ok(())
588 }
589
590 async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
591 unreachable!()
592 }
593
594 async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
595 unreachable!()
596 }
597
598 async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
599 unreachable!()
600 }
601
602 async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
603 self.catalog.write().drop_database(database_id);
604 Ok(())
605 }
606
607 async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
608 let database_id = self.drop_schema_id(schema_id);
609 self.catalog.write().drop_schema(database_id, schema_id);
610 Ok(())
611 }
612
613 async fn alter_name(
614 &self,
615 object_id: alter_name_request::Object,
616 object_name: &str,
617 ) -> Result<()> {
618 match object_id {
619 alter_name_request::Object::TableId(table_id) => {
620 self.catalog
621 .write()
622 .alter_table_name_by_id(table_id.into(), object_name);
623 Ok(())
624 }
625 _ => {
626 unimplemented!()
627 }
628 }
629 }
630
631 async fn alter_source(&self, source: PbSource) -> Result<()> {
632 self.catalog.write().update_source(&source);
633 Ok(())
634 }
635
636 async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
637 for database in self.catalog.read().iter_databases() {
638 for schema in database.iter_schemas() {
639 match object {
640 Object::TableId(table_id) => {
641 if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
642 {
643 let mut pb_table = table.to_prost();
644 pb_table.owner = owner_id;
645 self.catalog.write().update_table(&pb_table);
646 return Ok(());
647 }
648 }
649 _ => unreachable!(),
650 }
651 }
652 }
653
654 Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
655 }
656
657 async fn alter_set_schema(
658 &self,
659 object: alter_set_schema_request::Object,
660 new_schema_id: SchemaId,
661 ) -> Result<()> {
662 match object {
663 alter_set_schema_request::Object::TableId(table_id) => {
664 let mut pb_table = {
665 let reader = self.catalog.read();
666 let table = reader.get_any_table_by_id(table_id.into())?.to_owned();
667 table.to_prost()
668 };
669 pb_table.schema_id = new_schema_id;
670 self.catalog.write().update_table(&pb_table);
671 self.table_id_to_schema_id
672 .write()
673 .insert(table_id, new_schema_id);
674 Ok(())
675 }
676 _ => unreachable!(),
677 }
678 }
679
680 async fn alter_parallelism(
681 &self,
682 _job_id: JobId,
683 _parallelism: PbTableParallelism,
684 _deferred: bool,
685 ) -> Result<()> {
686 todo!()
687 }
688
689 async fn alter_config(
690 &self,
691 _job_id: JobId,
692 _entries_to_add: HashMap<String, String>,
693 _keys_to_remove: Vec<String>,
694 ) -> Result<()> {
695 todo!()
696 }
697
698 async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
699 todo!()
700 }
701
702 async fn alter_secret(
703 &self,
704 _secret_id: SecretId,
705 _secret_name: String,
706 _database_id: DatabaseId,
707 _schema_id: SchemaId,
708 _owner_id: u32,
709 _payload: Vec<u8>,
710 ) -> Result<()> {
711 unreachable!()
712 }
713
714 async fn alter_resource_group(
715 &self,
716 _table_id: TableId,
717 _resource_group: Option<String>,
718 _deferred: bool,
719 ) -> Result<()> {
720 todo!()
721 }
722
723 async fn alter_database_param(
724 &self,
725 database_id: DatabaseId,
726 param: AlterDatabaseParam,
727 ) -> Result<()> {
728 let mut pb_database = {
729 let reader = self.catalog.read();
730 let database = reader.get_database_by_id(database_id)?.to_owned();
731 database.to_prost()
732 };
733 match param {
734 AlterDatabaseParam::BarrierIntervalMs(interval) => {
735 pb_database.barrier_interval_ms = interval;
736 }
737 AlterDatabaseParam::CheckpointFrequency(frequency) => {
738 pb_database.checkpoint_frequency = frequency;
739 }
740 }
741 self.catalog.write().update_database(&pb_database);
742 Ok(())
743 }
744
745 async fn create_iceberg_table(
746 &self,
747 _table_job_info: PbTableJobInfo,
748 _sink_job_info: PbSinkJobInfo,
749 _iceberg_source: PbSource,
750 _if_not_exists: bool,
751 ) -> Result<()> {
752 todo!()
753 }
754}
755
756impl MockCatalogWriter {
757 pub fn new(
758 catalog: Arc<RwLock<Catalog>>,
759 hummock_snapshot_manager: HummockSnapshotManagerRef,
760 ) -> Self {
761 catalog.write().create_database(&PbDatabase {
762 id: 0.into(),
763 name: DEFAULT_DATABASE_NAME.to_owned(),
764 owner: DEFAULT_SUPER_USER_ID,
765 resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
766 barrier_interval_ms: None,
767 checkpoint_frequency: None,
768 });
769 catalog.write().create_schema(&PbSchema {
770 id: 1.into(),
771 name: DEFAULT_SCHEMA_NAME.to_owned(),
772 database_id: 0.into(),
773 owner: DEFAULT_SUPER_USER_ID,
774 });
775 catalog.write().create_schema(&PbSchema {
776 id: 2.into(),
777 name: PG_CATALOG_SCHEMA_NAME.to_owned(),
778 database_id: 0.into(),
779 owner: DEFAULT_SUPER_USER_ID,
780 });
781 catalog.write().create_schema(&PbSchema {
782 id: 3.into(),
783 name: RW_CATALOG_SCHEMA_NAME.to_owned(),
784 database_id: 0.into(),
785 owner: DEFAULT_SUPER_USER_ID,
786 });
787 let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
788 map.insert(1_u32.into(), 0_u32.into());
789 map.insert(2_u32.into(), 0_u32.into());
790 map.insert(3_u32.into(), 0_u32.into());
791 Self {
792 catalog,
793 id: AtomicU32::new(3),
794 table_id_to_schema_id: Default::default(),
795 schema_id_to_database_id: RwLock::new(map),
796 hummock_snapshot_manager,
797 }
798 }
799
800 fn gen_id<T: From<u32>>(&self) -> T {
801 (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
803 }
804
805 fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
806 self.table_id_to_schema_id
807 .write()
808 .insert(table_id, schema_id);
809 }
810
811 fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
812 let schema_id = self
813 .table_id_to_schema_id
814 .write()
815 .remove(&table_id)
816 .unwrap();
817 (self.get_database_id_by_schema(schema_id), schema_id)
818 }
819
820 fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
821 self.table_id_to_schema_id
822 .write()
823 .insert(table_id, schema_id);
824 }
825
826 fn add_table_or_subscription_id(
827 &self,
828 table_id: u32,
829 schema_id: SchemaId,
830 _database_id: DatabaseId,
831 ) {
832 self.table_id_to_schema_id
833 .write()
834 .insert(table_id, schema_id);
835 }
836
837 fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
838 self.table_id_to_schema_id
839 .write()
840 .insert(table_id, schema_id);
841 }
842
843 fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
844 let schema_id = self
845 .table_id_to_schema_id
846 .write()
847 .remove(&table_id)
848 .unwrap();
849 (self.get_database_id_by_schema(schema_id), schema_id)
850 }
851
852 fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
853 let schema_id = self
854 .table_id_to_schema_id
855 .write()
856 .remove(&table_id)
857 .unwrap();
858 (self.get_database_id_by_schema(schema_id), schema_id)
859 }
860
861 fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
862 let schema_id = self
863 .table_id_to_schema_id
864 .write()
865 .remove(&table_id)
866 .unwrap();
867 (self.get_database_id_by_schema(schema_id), schema_id)
868 }
869
870 fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
871 self.schema_id_to_database_id
872 .write()
873 .insert(schema_id, database_id);
874 }
875
876 fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
877 self.schema_id_to_database_id
878 .write()
879 .remove(&schema_id)
880 .unwrap()
881 }
882
883 fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
884 source.id = self.gen_id();
885 self.catalog.write().create_source(&source);
886 self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
887 Ok(source.id)
888 }
889
890 fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
891 sink.id = self.gen_id();
892 sink.stream_job_status = PbStreamJobStatus::Created as _;
893 self.catalog.write().create_sink(&sink);
894 self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
895 Ok(())
896 }
897
898 fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
899 subscription.id = self.gen_id();
900 self.catalog.write().create_subscription(&subscription);
901 self.add_table_or_subscription_id(
902 subscription.id.as_raw_id(),
903 subscription.schema_id,
904 subscription.database_id,
905 );
906 Ok(())
907 }
908
909 fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
910 *self
911 .schema_id_to_database_id
912 .read()
913 .get(&schema_id)
914 .unwrap()
915 }
916}
917
918pub struct MockUserInfoWriter {
919 id: AtomicU32,
920 user_info: Arc<RwLock<UserInfoManager>>,
921}
922
923#[async_trait::async_trait]
924impl UserInfoWriter for MockUserInfoWriter {
925 async fn create_user(&self, user: UserInfo) -> Result<()> {
926 let mut user = user;
927 user.id = self.gen_id();
928 self.user_info.write().create_user(user);
929 Ok(())
930 }
931
932 async fn drop_user(&self, id: UserId) -> Result<()> {
933 self.user_info.write().drop_user(id);
934 Ok(())
935 }
936
937 async fn update_user(
938 &self,
939 update_user: UserInfo,
940 update_fields: Vec<UpdateField>,
941 ) -> Result<()> {
942 let mut lock = self.user_info.write();
943 let id = update_user.get_id();
944 let Some(old_name) = lock.get_user_name_by_id(id) else {
945 return Ok(());
946 };
947 let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
948 update_fields.into_iter().for_each(|field| match field {
949 UpdateField::Super => user_info.is_super = update_user.is_super,
950 UpdateField::Login => user_info.can_login = update_user.can_login,
951 UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
952 UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
953 UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
954 UpdateField::Rename => user_info.name.clone_from(&update_user.name),
955 UpdateField::Admin => user_info.is_admin = update_user.is_admin,
956 UpdateField::Unspecified => unreachable!(),
957 });
958 lock.update_user(update_user);
959 Ok(())
960 }
961
962 async fn grant_privilege(
965 &self,
966 users: Vec<UserId>,
967 privileges: Vec<GrantPrivilege>,
968 with_grant_option: bool,
969 _grantor: UserId,
970 ) -> Result<()> {
971 let privileges = privileges
972 .into_iter()
973 .map(|mut p| {
974 p.action_with_opts
975 .iter_mut()
976 .for_each(|ao| ao.with_grant_option = with_grant_option);
977 p
978 })
979 .collect::<Vec<_>>();
980 for user_id in users {
981 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
982 u.extend_privileges(privileges.clone());
983 }
984 }
985 Ok(())
986 }
987
988 async fn revoke_privilege(
991 &self,
992 users: Vec<UserId>,
993 privileges: Vec<GrantPrivilege>,
994 _granted_by: UserId,
995 _revoke_by: UserId,
996 revoke_grant_option: bool,
997 _cascade: bool,
998 ) -> Result<()> {
999 for user_id in users {
1000 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1001 u.revoke_privileges(privileges.clone(), revoke_grant_option);
1002 }
1003 }
1004 Ok(())
1005 }
1006
1007 async fn alter_default_privilege(
1008 &self,
1009 _users: Vec<UserId>,
1010 _database_id: DatabaseId,
1011 _schemas: Vec<SchemaId>,
1012 _operation: AlterDefaultPrivilegeOperation,
1013 _operated_by: UserId,
1014 ) -> Result<()> {
1015 todo!()
1016 }
1017}
1018
1019impl MockUserInfoWriter {
1020 pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1021 user_info.write().create_user(UserInfo {
1022 id: DEFAULT_SUPER_USER_ID,
1023 name: DEFAULT_SUPER_USER.to_owned(),
1024 is_super: true,
1025 can_create_db: true,
1026 can_create_user: true,
1027 can_login: true,
1028 ..Default::default()
1029 });
1030 user_info.write().create_user(UserInfo {
1031 id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1032 name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1033 is_super: true,
1034 can_create_db: true,
1035 can_create_user: true,
1036 can_login: true,
1037 is_admin: true,
1038 ..Default::default()
1039 });
1040 Self {
1041 user_info,
1042 id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
1043 }
1044 }
1045
1046 fn gen_id(&self) -> u32 {
1047 self.id.fetch_add(1, Ordering::SeqCst)
1048 }
1049}
1050
1051pub struct MockFrontendMetaClient {}
1052
1053#[async_trait::async_trait]
1054impl FrontendMetaClient for MockFrontendMetaClient {
1055 async fn try_unregister(&self) {}
1056
1057 async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1058 Ok(INVALID_VERSION_ID)
1059 }
1060
1061 async fn wait(&self) -> RpcResult<()> {
1062 Ok(())
1063 }
1064
1065 async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1066 Ok(vec![])
1067 }
1068
1069 async fn list_table_fragments(
1070 &self,
1071 _table_ids: &[JobId],
1072 ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1073 Ok(HashMap::default())
1074 }
1075
1076 async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1077 Ok(vec![])
1078 }
1079
1080 async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1081 Ok(vec![])
1082 }
1083
1084 async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1085 Ok(vec![])
1086 }
1087
1088 async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1089 Ok(vec![])
1090 }
1091
1092 async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1093 Ok(vec![])
1094 }
1095
1096 async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1097 Ok(vec![])
1098 }
1099
1100 async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1101 Ok(vec![])
1102 }
1103
1104 async fn set_system_param(
1105 &self,
1106 _param: String,
1107 _value: Option<String>,
1108 ) -> RpcResult<Option<SystemParamsReader>> {
1109 Ok(Some(SystemParams::default().into()))
1110 }
1111
1112 async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1113 Ok(Default::default())
1114 }
1115
1116 async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1117 Ok("".to_owned())
1118 }
1119
1120 async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1121 Ok(vec![])
1122 }
1123
1124 async fn get_tables(
1125 &self,
1126 _table_ids: Vec<crate::catalog::TableId>,
1127 _include_dropped_tables: bool,
1128 ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1129 Ok(HashMap::new())
1130 }
1131
1132 async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, u64)>> {
1133 unimplemented!()
1134 }
1135
1136 async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1137 unimplemented!()
1138 }
1139
1140 async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1141 Ok(HummockVersion::default())
1142 }
1143
1144 async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1145 unimplemented!()
1146 }
1147
1148 async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1149 unimplemented!()
1150 }
1151
1152 async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1153 unimplemented!()
1154 }
1155
1156 async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1157 unimplemented!()
1158 }
1159
1160 async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1161 unimplemented!()
1162 }
1163
1164 async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1165 unimplemented!()
1166 }
1167
1168 async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1169 Ok(vec![])
1170 }
1171
1172 async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1173 unimplemented!()
1174 }
1175
1176 async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1177 Ok(vec![])
1178 }
1179
1180 async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1181 unimplemented!()
1182 }
1183
1184 async fn recover(&self) -> RpcResult<()> {
1185 unimplemented!()
1186 }
1187
1188 async fn apply_throttle(
1189 &self,
1190 _kind: PbThrottleTarget,
1191 _id: u32,
1192 _rate_limit: Option<u32>,
1193 ) -> RpcResult<()> {
1194 unimplemented!()
1195 }
1196
1197 async fn alter_fragment_parallelism(
1198 &self,
1199 _fragment_ids: Vec<FragmentId>,
1200 _parallelism: Option<PbTableParallelism>,
1201 ) -> RpcResult<()> {
1202 unimplemented!()
1203 }
1204
1205 async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1206 Ok(RecoveryStatus::StatusRunning)
1207 }
1208
1209 async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1210 Ok(vec![])
1211 }
1212
1213 async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1214 Ok(vec![])
1215 }
1216
1217 async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1218 Ok(HashMap::default())
1219 }
1220
1221 async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1222 unimplemented!()
1223 }
1224
1225 async fn alter_sink_props(
1226 &self,
1227 _sink_id: SinkId,
1228 _changed_props: BTreeMap<String, String>,
1229 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1230 _connector_conn_ref: Option<ConnectionId>,
1231 ) -> RpcResult<()> {
1232 unimplemented!()
1233 }
1234
1235 async fn alter_iceberg_table_props(
1236 &self,
1237 _table_id: TableId,
1238 _sink_id: SinkId,
1239 _source_id: SourceId,
1240 _changed_props: BTreeMap<String, String>,
1241 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1242 _connector_conn_ref: Option<ConnectionId>,
1243 ) -> RpcResult<()> {
1244 unimplemented!()
1245 }
1246
1247 async fn alter_source_connector_props(
1248 &self,
1249 _source_id: SourceId,
1250 _changed_props: BTreeMap<String, String>,
1251 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1252 _connector_conn_ref: Option<ConnectionId>,
1253 ) -> RpcResult<()> {
1254 unimplemented!()
1255 }
1256
1257 async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1258 unimplemented!()
1259 }
1260
1261 async fn get_fragment_by_id(
1262 &self,
1263 _fragment_id: FragmentId,
1264 ) -> RpcResult<Option<FragmentDistribution>> {
1265 unimplemented!()
1266 }
1267
1268 async fn get_fragment_vnodes(
1269 &self,
1270 _fragment_id: FragmentId,
1271 ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1272 unimplemented!()
1273 }
1274
1275 async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1276 unimplemented!()
1277 }
1278
1279 fn worker_id(&self) -> WorkerId {
1280 0.into()
1281 }
1282
1283 async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1284 Ok(())
1285 }
1286
1287 async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1288 Ok(1)
1289 }
1290
1291 async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1292 Ok(())
1293 }
1294
1295 async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1296 Ok(RefreshResponse { status: None })
1297 }
1298
1299 fn cluster_id(&self) -> &str {
1300 "test-cluster-uuid"
1301 }
1302
1303 async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1304 unimplemented!()
1305 }
1306}
1307
1308#[cfg(test)]
1309pub static PROTO_FILE_DATA: &str = r#"
1310 syntax = "proto3";
1311 package test;
1312 message TestRecord {
1313 int32 id = 1;
1314 Country country = 3;
1315 int64 zipcode = 4;
1316 float rate = 5;
1317 }
1318 message TestRecordAlterType {
1319 string id = 1;
1320 Country country = 3;
1321 int32 zipcode = 4;
1322 float rate = 5;
1323 }
1324 message TestRecordExt {
1325 int32 id = 1;
1326 Country country = 3;
1327 int64 zipcode = 4;
1328 float rate = 5;
1329 string name = 6;
1330 }
1331 message Country {
1332 string address = 1;
1333 City city = 2;
1334 string zipcode = 3;
1335 }
1336 message City {
1337 string address = 1;
1338 string zipcode = 2;
1339 }"#;
1340
1341pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1344 let in_file = Builder::new()
1345 .prefix("temp")
1346 .suffix(".proto")
1347 .rand_bytes(8)
1348 .tempfile()
1349 .unwrap();
1350
1351 let out_file = Builder::new()
1352 .prefix("temp")
1353 .suffix(".pb")
1354 .rand_bytes(8)
1355 .tempfile()
1356 .unwrap();
1357
1358 let mut file = in_file.as_file();
1359 file.write_all(proto_data.as_ref())
1360 .expect("writing binary to test file");
1361 file.flush().expect("flush temp file failed");
1362 let include_path = in_file
1363 .path()
1364 .parent()
1365 .unwrap()
1366 .to_string_lossy()
1367 .into_owned();
1368 let out_path = out_file.path().to_string_lossy().into_owned();
1369 let in_path = in_file.path().to_string_lossy().into_owned();
1370 let mut compile = std::process::Command::new("protoc");
1371
1372 let out = compile
1373 .arg("--include_imports")
1374 .arg("-I")
1375 .arg(include_path)
1376 .arg(format!("--descriptor_set_out={}", out_path))
1377 .arg(in_path)
1378 .output()
1379 .expect("failed to compile proto");
1380 if !out.status.success() {
1381 panic!("compile proto failed \n output: {:?}", out);
1382 }
1383 out_file
1384}