1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28 AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29 DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30 FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31 RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::util::cluster_limit::ClusterLimit;
38use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
39use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
40use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
41use risingwave_pb::backup_service::MetaSnapshotMetadata;
42use risingwave_pb::catalog::{
43 PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44 PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
49use risingwave_pb::ddl_service::{
50 DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
51 alter_swap_rename_request, create_connection_request,
52};
53use risingwave_pb::hummock::write_limits::WriteLimit;
54use risingwave_pb::hummock::{
55 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
56};
57use risingwave_pb::id::ActorId;
58use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
59use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
60use risingwave_pb::meta::list_actor_states_response::ActorState;
61use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
62use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
63use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
64use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
65use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
66use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
67use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
68use risingwave_pb::meta::{
69 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
70 RefreshRequest, RefreshResponse, SystemParams,
71};
72use risingwave_pb::secret::PbSecretRef;
73use risingwave_pb::stream_plan::StreamFragmentGraph;
74use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
75use risingwave_pb::user::update_user_request::UpdateField;
76use risingwave_pb::user::{GrantPrivilege, UserInfo};
77use risingwave_rpc_client::error::Result as RpcResult;
78use tempfile::{Builder, NamedTempFile};
79
80use crate::FrontendOpts;
81use crate::catalog::catalog_service::CatalogWriter;
82use crate::catalog::root_catalog::Catalog;
83use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
84use crate::error::{ErrorCode, Result, RwError};
85use crate::handler::RwPgResponse;
86use crate::meta_client::FrontendMetaClient;
87use crate::scheduler::HummockSnapshotManagerRef;
88use crate::session::{AuthContext, FrontendEnv, SessionImpl};
89use crate::user::UserId;
90use crate::user::user_manager::UserInfoManager;
91use crate::user::user_service::UserInfoWriter;
92
93pub struct LocalFrontend {
95 pub opts: FrontendOpts,
96 env: FrontendEnv,
97}
98
99impl SessionManager for LocalFrontend {
100 type Error = RwError;
101 type Session = SessionImpl;
102
103 fn create_dummy_session(
104 &self,
105 _database_id: DatabaseId,
106 _user_name: u32,
107 ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
108 unreachable!()
109 }
110
111 fn connect(
112 &self,
113 _database: &str,
114 _user_name: &str,
115 _peer_addr: AddressRef,
116 ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
117 Ok(self.session_ref())
118 }
119
120 fn cancel_queries_in_session(&self, _session_id: SessionId) {
121 unreachable!()
122 }
123
124 fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
125 unreachable!()
126 }
127
128 fn end_session(&self, _session: &Self::Session) {
129 unreachable!()
130 }
131}
132
133impl LocalFrontend {
134 #[expect(clippy::unused_async)]
135 pub async fn new(opts: FrontendOpts) -> Self {
136 let env = FrontendEnv::mock();
137 Self { opts, env }
138 }
139
140 pub async fn run_sql(
141 &self,
142 sql: impl Into<String>,
143 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
144 let sql: Arc<str> = Arc::from(sql.into());
145 self.session_ref().run_statement(sql, vec![]).await
146 }
147
148 pub async fn run_sql_with_session(
149 &self,
150 session_ref: Arc<SessionImpl>,
151 sql: impl Into<String>,
152 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
153 let sql: Arc<str> = Arc::from(sql.into());
154 session_ref.run_statement(sql, vec![]).await
155 }
156
157 pub async fn run_user_sql(
158 &self,
159 sql: impl Into<String>,
160 database: String,
161 user_name: String,
162 user_id: UserId,
163 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
164 let sql: Arc<str> = Arc::from(sql.into());
165 self.session_user_ref(database, user_name, user_id)
166 .run_statement(sql, vec![])
167 .await
168 }
169
170 pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
171 let mut rsp = self.run_sql(sql).await.unwrap();
172 let mut res = vec![];
173 #[for_await]
174 for row_set in rsp.values_stream() {
175 for row in row_set.unwrap() {
176 res.push(format!("{:?}", row));
177 }
178 }
179 res
180 }
181
182 pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
183 let mut rsp = self.run_sql(sql).await.unwrap();
184 assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
185 let mut res = String::new();
186 #[for_await]
187 for row_set in rsp.values_stream() {
188 for row in row_set.unwrap() {
189 let row: Row = row;
190 let row = row.values()[0].as_ref().unwrap();
191 res += std::str::from_utf8(row).unwrap();
192 res += "\n";
193 }
194 }
195 res
196 }
197
198 pub fn session_ref(&self) -> Arc<SessionImpl> {
200 self.session_user_ref(
201 DEFAULT_DATABASE_NAME.to_owned(),
202 DEFAULT_SUPER_USER.to_owned(),
203 DEFAULT_SUPER_USER_ID,
204 )
205 }
206
207 pub fn session_user_ref(
208 &self,
209 database: String,
210 user_name: String,
211 user_id: UserId,
212 ) -> Arc<SessionImpl> {
213 Arc::new(SessionImpl::new(
214 self.env.clone(),
215 AuthContext::new(database, user_name, user_id),
216 UserAuthenticator::None,
217 (0, 0),
219 Address::Tcp(SocketAddr::new(
220 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
221 6666,
222 ))
223 .into(),
224 Default::default(),
225 ))
226 }
227}
228
229pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
230 if rsp.stmt_type() != StatementType::EXPLAIN {
231 panic!("RESPONSE INVALID: {rsp:?}");
232 }
233 let mut res = String::new();
234 #[for_await]
235 for row_set in rsp.values_stream() {
236 for row in row_set.unwrap() {
237 let row: Row = row;
238 let row = row.values()[0].as_ref().unwrap();
239 res += std::str::from_utf8(row).unwrap();
240 res += "\n";
241 }
242 }
243 res
244}
245
246pub struct MockCatalogWriter {
247 catalog: Arc<RwLock<Catalog>>,
248 id: AtomicU32,
249 table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
250 schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
251 hummock_snapshot_manager: HummockSnapshotManagerRef,
252}
253
254#[async_trait::async_trait]
255impl CatalogWriter for MockCatalogWriter {
256 async fn create_database(
257 &self,
258 db_name: &str,
259 owner: UserId,
260 resource_group: &str,
261 barrier_interval_ms: Option<u32>,
262 checkpoint_frequency: Option<u64>,
263 ) -> Result<()> {
264 let database_id = DatabaseId::new(self.gen_id());
265 self.catalog.write().create_database(&PbDatabase {
266 name: db_name.to_owned(),
267 id: database_id,
268 owner,
269 resource_group: resource_group.to_owned(),
270 barrier_interval_ms,
271 checkpoint_frequency,
272 });
273 self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
274 .await?;
275 self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
276 .await?;
277 self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
278 .await?;
279 Ok(())
280 }
281
282 async fn create_schema(
283 &self,
284 db_id: DatabaseId,
285 schema_name: &str,
286 owner: UserId,
287 ) -> Result<()> {
288 let id = self.gen_id();
289 self.catalog.write().create_schema(&PbSchema {
290 id,
291 name: schema_name.to_owned(),
292 database_id: db_id,
293 owner,
294 });
295 self.add_schema_id(id, db_id);
296 Ok(())
297 }
298
299 async fn create_materialized_view(
300 &self,
301 mut table: PbTable,
302 _graph: StreamFragmentGraph,
303 _dependencies: HashSet<ObjectId>,
304 _specific_resource_group: Option<String>,
305 _if_not_exists: bool,
306 ) -> Result<()> {
307 table.id = self.gen_id();
308 table.stream_job_status = PbStreamJobStatus::Created as _;
309 table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
310 self.catalog.write().create_table(&table);
311 self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
312 self.hummock_snapshot_manager.add_table_for_test(table.id);
313 Ok(())
314 }
315
316 async fn replace_materialized_view(
317 &self,
318 mut table: PbTable,
319 _graph: StreamFragmentGraph,
320 ) -> Result<()> {
321 table.stream_job_status = PbStreamJobStatus::Created as _;
322 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
323 self.catalog.write().update_table(&table);
324 Ok(())
325 }
326
327 async fn create_view(&self, mut view: PbView, _dependencies: HashSet<ObjectId>) -> Result<()> {
328 view.id = self.gen_id();
329 self.catalog.write().create_view(&view);
330 self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
331 Ok(())
332 }
333
334 async fn create_table(
335 &self,
336 source: Option<PbSource>,
337 mut table: PbTable,
338 graph: StreamFragmentGraph,
339 _job_type: PbTableJobType,
340 if_not_exists: bool,
341 _dependencies: HashSet<ObjectId>,
342 ) -> Result<()> {
343 if let Some(source) = source {
344 let source_id = self.create_source_inner(source)?;
345 table.optional_associated_source_id = Some(source_id.into());
346 }
347 self.create_materialized_view(table, graph, HashSet::new(), None, if_not_exists)
348 .await?;
349 Ok(())
350 }
351
352 async fn replace_table(
353 &self,
354 _source: Option<PbSource>,
355 mut table: PbTable,
356 _graph: StreamFragmentGraph,
357 _job_type: TableJobType,
358 ) -> Result<()> {
359 table.stream_job_status = PbStreamJobStatus::Created as _;
360 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
361 self.catalog.write().update_table(&table);
362 Ok(())
363 }
364
365 async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
366 self.catalog.write().update_source(&source);
367 Ok(())
368 }
369
370 async fn create_source(
371 &self,
372 source: PbSource,
373 _graph: Option<StreamFragmentGraph>,
374 _if_not_exists: bool,
375 ) -> Result<()> {
376 self.create_source_inner(source).map(|_| ())
377 }
378
379 async fn create_sink(
380 &self,
381 sink: PbSink,
382 graph: StreamFragmentGraph,
383 _dependencies: HashSet<ObjectId>,
384 _if_not_exists: bool,
385 ) -> Result<()> {
386 self.create_sink_inner(sink, graph)
387 }
388
389 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
390 self.create_subscription_inner(subscription)
391 }
392
393 async fn create_index(
394 &self,
395 mut index: PbIndex,
396 mut index_table: PbTable,
397 _graph: StreamFragmentGraph,
398 _if_not_exists: bool,
399 ) -> Result<()> {
400 index_table.id = self.gen_id();
401 index_table.stream_job_status = PbStreamJobStatus::Created as _;
402 index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
403 self.catalog.write().create_table(&index_table);
404 self.add_table_or_index_id(
405 index_table.id.as_raw_id(),
406 index_table.schema_id,
407 index_table.database_id,
408 );
409
410 index.id = index_table.id.as_raw_id().into();
411 index.index_table_id = index_table.id;
412 self.catalog.write().create_index(&index);
413 Ok(())
414 }
415
416 async fn create_function(&self, _function: PbFunction) -> Result<()> {
417 unreachable!()
418 }
419
420 async fn create_connection(
421 &self,
422 _connection_name: String,
423 _database_id: DatabaseId,
424 _schema_id: SchemaId,
425 _owner_id: u32,
426 _connection: create_connection_request::Payload,
427 ) -> Result<()> {
428 unreachable!()
429 }
430
431 async fn create_secret(
432 &self,
433 _secret_name: String,
434 _database_id: DatabaseId,
435 _schema_id: SchemaId,
436 _owner_id: u32,
437 _payload: Vec<u8>,
438 ) -> Result<()> {
439 unreachable!()
440 }
441
442 async fn comment_on(&self, _comment: PbComment) -> Result<()> {
443 unreachable!()
444 }
445
446 async fn drop_table(
447 &self,
448 source_id: Option<u32>,
449 table_id: TableId,
450 cascade: bool,
451 ) -> Result<()> {
452 if cascade {
453 return Err(ErrorCode::NotSupported(
454 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
455 "use drop instead".to_owned(),
456 )
457 .into());
458 }
459 if let Some(source_id) = source_id {
460 self.drop_table_or_source_id(source_id);
461 }
462 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
463 let indexes =
464 self.catalog
465 .read()
466 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
467 for index in indexes {
468 self.drop_index(index.id, cascade).await?;
469 }
470 self.catalog
471 .write()
472 .drop_table(database_id, schema_id, table_id);
473 if let Some(source_id) = source_id {
474 self.catalog
475 .write()
476 .drop_source(database_id, schema_id, source_id.into());
477 }
478 Ok(())
479 }
480
481 async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
482 unreachable!()
483 }
484
485 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
486 if cascade {
487 return Err(ErrorCode::NotSupported(
488 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
489 "use drop instead".to_owned(),
490 )
491 .into());
492 }
493 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
494 let indexes =
495 self.catalog
496 .read()
497 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
498 for index in indexes {
499 self.drop_index(index.id, cascade).await?;
500 }
501 self.catalog
502 .write()
503 .drop_table(database_id, schema_id, table_id);
504 Ok(())
505 }
506
507 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
508 if cascade {
509 return Err(ErrorCode::NotSupported(
510 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
511 "use drop instead".to_owned(),
512 )
513 .into());
514 }
515 let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
516 self.catalog
517 .write()
518 .drop_source(database_id, schema_id, source_id);
519 Ok(())
520 }
521
522 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
523 if cascade {
524 return Err(ErrorCode::NotSupported(
525 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
526 "use drop instead".to_owned(),
527 )
528 .into());
529 }
530 let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
531 self.catalog
532 .write()
533 .drop_sink(database_id, schema_id, sink_id);
534 Ok(())
535 }
536
537 async fn drop_subscription(
538 &self,
539 subscription_id: SubscriptionId,
540 cascade: bool,
541 ) -> Result<()> {
542 if cascade {
543 return Err(ErrorCode::NotSupported(
544 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
545 "use drop instead".to_owned(),
546 )
547 .into());
548 }
549 let (database_id, schema_id) =
550 self.drop_table_or_subscription_id(subscription_id.as_raw_id());
551 self.catalog
552 .write()
553 .drop_subscription(database_id, schema_id, subscription_id);
554 Ok(())
555 }
556
557 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
558 if cascade {
559 return Err(ErrorCode::NotSupported(
560 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
561 "use drop instead".to_owned(),
562 )
563 .into());
564 }
565 let &schema_id = self
566 .table_id_to_schema_id
567 .read()
568 .get(&index_id.as_raw_id())
569 .unwrap();
570 let database_id = self.get_database_id_by_schema(schema_id);
571
572 let index = {
573 let catalog_reader = self.catalog.read();
574 let schema_catalog = catalog_reader
575 .get_schema_by_id(database_id, schema_id)
576 .unwrap();
577 schema_catalog.get_index_by_id(index_id).unwrap().clone()
578 };
579
580 let index_table_id = index.index_table().id;
581 let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
582 self.catalog
583 .write()
584 .drop_index(database_id, schema_id, index_id);
585 self.catalog
586 .write()
587 .drop_table(database_id, schema_id, index_table_id);
588 Ok(())
589 }
590
591 async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
592 unreachable!()
593 }
594
595 async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
596 unreachable!()
597 }
598
599 async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
600 unreachable!()
601 }
602
603 async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
604 self.catalog.write().drop_database(database_id);
605 Ok(())
606 }
607
608 async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
609 let database_id = self.drop_schema_id(schema_id);
610 self.catalog.write().drop_schema(database_id, schema_id);
611 Ok(())
612 }
613
614 async fn alter_name(
615 &self,
616 object_id: alter_name_request::Object,
617 object_name: &str,
618 ) -> Result<()> {
619 match object_id {
620 alter_name_request::Object::TableId(table_id) => {
621 self.catalog
622 .write()
623 .alter_table_name_by_id(table_id.into(), object_name);
624 Ok(())
625 }
626 _ => {
627 unimplemented!()
628 }
629 }
630 }
631
632 async fn alter_source(&self, source: PbSource) -> Result<()> {
633 self.catalog.write().update_source(&source);
634 Ok(())
635 }
636
637 async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
638 for database in self.catalog.read().iter_databases() {
639 for schema in database.iter_schemas() {
640 match object {
641 Object::TableId(table_id) => {
642 if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
643 {
644 let mut pb_table = table.to_prost();
645 pb_table.owner = owner_id;
646 self.catalog.write().update_table(&pb_table);
647 return Ok(());
648 }
649 }
650 _ => unreachable!(),
651 }
652 }
653 }
654
655 Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
656 }
657
658 async fn alter_set_schema(
659 &self,
660 object: alter_set_schema_request::Object,
661 new_schema_id: SchemaId,
662 ) -> Result<()> {
663 match object {
664 alter_set_schema_request::Object::TableId(table_id) => {
665 let mut pb_table = {
666 let reader = self.catalog.read();
667 let table = reader.get_any_table_by_id(table_id.into())?.to_owned();
668 table.to_prost()
669 };
670 pb_table.schema_id = new_schema_id;
671 self.catalog.write().update_table(&pb_table);
672 self.table_id_to_schema_id
673 .write()
674 .insert(table_id, new_schema_id);
675 Ok(())
676 }
677 _ => unreachable!(),
678 }
679 }
680
681 async fn alter_parallelism(
682 &self,
683 _job_id: JobId,
684 _parallelism: PbTableParallelism,
685 _deferred: bool,
686 ) -> Result<()> {
687 todo!()
688 }
689
690 async fn alter_config(
691 &self,
692 _job_id: JobId,
693 _entries_to_add: HashMap<String, String>,
694 _keys_to_remove: Vec<String>,
695 ) -> Result<()> {
696 todo!()
697 }
698
699 async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
700 todo!()
701 }
702
703 async fn alter_secret(
704 &self,
705 _secret_id: SecretId,
706 _secret_name: String,
707 _database_id: DatabaseId,
708 _schema_id: SchemaId,
709 _owner_id: u32,
710 _payload: Vec<u8>,
711 ) -> Result<()> {
712 unreachable!()
713 }
714
715 async fn alter_resource_group(
716 &self,
717 _table_id: TableId,
718 _resource_group: Option<String>,
719 _deferred: bool,
720 ) -> Result<()> {
721 todo!()
722 }
723
724 async fn alter_database_param(
725 &self,
726 database_id: DatabaseId,
727 param: AlterDatabaseParam,
728 ) -> Result<()> {
729 let mut pb_database = {
730 let reader = self.catalog.read();
731 let database = reader.get_database_by_id(database_id)?.to_owned();
732 database.to_prost()
733 };
734 match param {
735 AlterDatabaseParam::BarrierIntervalMs(interval) => {
736 pb_database.barrier_interval_ms = interval;
737 }
738 AlterDatabaseParam::CheckpointFrequency(frequency) => {
739 pb_database.checkpoint_frequency = frequency;
740 }
741 }
742 self.catalog.write().update_database(&pb_database);
743 Ok(())
744 }
745
746 async fn create_iceberg_table(
747 &self,
748 _table_job_info: PbTableJobInfo,
749 _sink_job_info: PbSinkJobInfo,
750 _iceberg_source: PbSource,
751 _if_not_exists: bool,
752 ) -> Result<()> {
753 todo!()
754 }
755}
756
757impl MockCatalogWriter {
758 pub fn new(
759 catalog: Arc<RwLock<Catalog>>,
760 hummock_snapshot_manager: HummockSnapshotManagerRef,
761 ) -> Self {
762 catalog.write().create_database(&PbDatabase {
763 id: 0.into(),
764 name: DEFAULT_DATABASE_NAME.to_owned(),
765 owner: DEFAULT_SUPER_USER_ID,
766 resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
767 barrier_interval_ms: None,
768 checkpoint_frequency: None,
769 });
770 catalog.write().create_schema(&PbSchema {
771 id: 1.into(),
772 name: DEFAULT_SCHEMA_NAME.to_owned(),
773 database_id: 0.into(),
774 owner: DEFAULT_SUPER_USER_ID,
775 });
776 catalog.write().create_schema(&PbSchema {
777 id: 2.into(),
778 name: PG_CATALOG_SCHEMA_NAME.to_owned(),
779 database_id: 0.into(),
780 owner: DEFAULT_SUPER_USER_ID,
781 });
782 catalog.write().create_schema(&PbSchema {
783 id: 3.into(),
784 name: RW_CATALOG_SCHEMA_NAME.to_owned(),
785 database_id: 0.into(),
786 owner: DEFAULT_SUPER_USER_ID,
787 });
788 let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
789 map.insert(1_u32.into(), 0_u32.into());
790 map.insert(2_u32.into(), 0_u32.into());
791 map.insert(3_u32.into(), 0_u32.into());
792 Self {
793 catalog,
794 id: AtomicU32::new(3),
795 table_id_to_schema_id: Default::default(),
796 schema_id_to_database_id: RwLock::new(map),
797 hummock_snapshot_manager,
798 }
799 }
800
801 fn gen_id<T: From<u32>>(&self) -> T {
802 (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
804 }
805
806 fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
807 self.table_id_to_schema_id
808 .write()
809 .insert(table_id, schema_id);
810 }
811
812 fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
813 let schema_id = self
814 .table_id_to_schema_id
815 .write()
816 .remove(&table_id)
817 .unwrap();
818 (self.get_database_id_by_schema(schema_id), schema_id)
819 }
820
821 fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
822 self.table_id_to_schema_id
823 .write()
824 .insert(table_id, schema_id);
825 }
826
827 fn add_table_or_subscription_id(
828 &self,
829 table_id: u32,
830 schema_id: SchemaId,
831 _database_id: DatabaseId,
832 ) {
833 self.table_id_to_schema_id
834 .write()
835 .insert(table_id, schema_id);
836 }
837
838 fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
839 self.table_id_to_schema_id
840 .write()
841 .insert(table_id, schema_id);
842 }
843
844 fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
845 let schema_id = self
846 .table_id_to_schema_id
847 .write()
848 .remove(&table_id)
849 .unwrap();
850 (self.get_database_id_by_schema(schema_id), schema_id)
851 }
852
853 fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
854 let schema_id = self
855 .table_id_to_schema_id
856 .write()
857 .remove(&table_id)
858 .unwrap();
859 (self.get_database_id_by_schema(schema_id), schema_id)
860 }
861
862 fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
863 let schema_id = self
864 .table_id_to_schema_id
865 .write()
866 .remove(&table_id)
867 .unwrap();
868 (self.get_database_id_by_schema(schema_id), schema_id)
869 }
870
871 fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
872 self.schema_id_to_database_id
873 .write()
874 .insert(schema_id, database_id);
875 }
876
877 fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
878 self.schema_id_to_database_id
879 .write()
880 .remove(&schema_id)
881 .unwrap()
882 }
883
884 fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
885 source.id = self.gen_id();
886 self.catalog.write().create_source(&source);
887 self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
888 Ok(source.id)
889 }
890
891 fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
892 sink.id = self.gen_id();
893 sink.stream_job_status = PbStreamJobStatus::Created as _;
894 self.catalog.write().create_sink(&sink);
895 self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
896 Ok(())
897 }
898
899 fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
900 subscription.id = self.gen_id();
901 self.catalog.write().create_subscription(&subscription);
902 self.add_table_or_subscription_id(
903 subscription.id.as_raw_id(),
904 subscription.schema_id,
905 subscription.database_id,
906 );
907 Ok(())
908 }
909
910 fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
911 *self
912 .schema_id_to_database_id
913 .read()
914 .get(&schema_id)
915 .unwrap()
916 }
917}
918
919pub struct MockUserInfoWriter {
920 id: AtomicU32,
921 user_info: Arc<RwLock<UserInfoManager>>,
922}
923
924#[async_trait::async_trait]
925impl UserInfoWriter for MockUserInfoWriter {
926 async fn create_user(&self, user: UserInfo) -> Result<()> {
927 let mut user = user;
928 user.id = self.gen_id();
929 self.user_info.write().create_user(user);
930 Ok(())
931 }
932
933 async fn drop_user(&self, id: UserId) -> Result<()> {
934 self.user_info.write().drop_user(id);
935 Ok(())
936 }
937
938 async fn update_user(
939 &self,
940 update_user: UserInfo,
941 update_fields: Vec<UpdateField>,
942 ) -> Result<()> {
943 let mut lock = self.user_info.write();
944 let id = update_user.get_id();
945 let Some(old_name) = lock.get_user_name_by_id(id) else {
946 return Ok(());
947 };
948 let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
949 update_fields.into_iter().for_each(|field| match field {
950 UpdateField::Super => user_info.is_super = update_user.is_super,
951 UpdateField::Login => user_info.can_login = update_user.can_login,
952 UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
953 UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
954 UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
955 UpdateField::Rename => user_info.name.clone_from(&update_user.name),
956 UpdateField::Admin => user_info.is_admin = update_user.is_admin,
957 UpdateField::Unspecified => unreachable!(),
958 });
959 lock.update_user(update_user);
960 Ok(())
961 }
962
963 async fn grant_privilege(
966 &self,
967 users: Vec<UserId>,
968 privileges: Vec<GrantPrivilege>,
969 with_grant_option: bool,
970 _grantor: UserId,
971 ) -> Result<()> {
972 let privileges = privileges
973 .into_iter()
974 .map(|mut p| {
975 p.action_with_opts
976 .iter_mut()
977 .for_each(|ao| ao.with_grant_option = with_grant_option);
978 p
979 })
980 .collect::<Vec<_>>();
981 for user_id in users {
982 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
983 u.extend_privileges(privileges.clone());
984 }
985 }
986 Ok(())
987 }
988
989 async fn revoke_privilege(
992 &self,
993 users: Vec<UserId>,
994 privileges: Vec<GrantPrivilege>,
995 _granted_by: UserId,
996 _revoke_by: UserId,
997 revoke_grant_option: bool,
998 _cascade: bool,
999 ) -> Result<()> {
1000 for user_id in users {
1001 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1002 u.revoke_privileges(privileges.clone(), revoke_grant_option);
1003 }
1004 }
1005 Ok(())
1006 }
1007
1008 async fn alter_default_privilege(
1009 &self,
1010 _users: Vec<UserId>,
1011 _database_id: DatabaseId,
1012 _schemas: Vec<SchemaId>,
1013 _operation: AlterDefaultPrivilegeOperation,
1014 _operated_by: UserId,
1015 ) -> Result<()> {
1016 todo!()
1017 }
1018}
1019
1020impl MockUserInfoWriter {
1021 pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1022 user_info.write().create_user(UserInfo {
1023 id: DEFAULT_SUPER_USER_ID,
1024 name: DEFAULT_SUPER_USER.to_owned(),
1025 is_super: true,
1026 can_create_db: true,
1027 can_create_user: true,
1028 can_login: true,
1029 ..Default::default()
1030 });
1031 user_info.write().create_user(UserInfo {
1032 id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1033 name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1034 is_super: true,
1035 can_create_db: true,
1036 can_create_user: true,
1037 can_login: true,
1038 is_admin: true,
1039 ..Default::default()
1040 });
1041 Self {
1042 user_info,
1043 id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
1044 }
1045 }
1046
1047 fn gen_id(&self) -> u32 {
1048 self.id.fetch_add(1, Ordering::SeqCst)
1049 }
1050}
1051
1052pub struct MockFrontendMetaClient {}
1053
1054#[async_trait::async_trait]
1055impl FrontendMetaClient for MockFrontendMetaClient {
1056 async fn try_unregister(&self) {}
1057
1058 async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1059 Ok(INVALID_VERSION_ID)
1060 }
1061
1062 async fn wait(&self) -> RpcResult<()> {
1063 Ok(())
1064 }
1065
1066 async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1067 Ok(vec![])
1068 }
1069
1070 async fn list_table_fragments(
1071 &self,
1072 _table_ids: &[JobId],
1073 ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1074 Ok(HashMap::default())
1075 }
1076
1077 async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1078 Ok(vec![])
1079 }
1080
1081 async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1082 Ok(vec![])
1083 }
1084
1085 async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1086 Ok(vec![])
1087 }
1088
1089 async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1090 Ok(vec![])
1091 }
1092
1093 async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1094 Ok(vec![])
1095 }
1096
1097 async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1098 Ok(vec![])
1099 }
1100
1101 async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1102 Ok(vec![])
1103 }
1104
1105 async fn set_system_param(
1106 &self,
1107 _param: String,
1108 _value: Option<String>,
1109 ) -> RpcResult<Option<SystemParamsReader>> {
1110 Ok(Some(SystemParams::default().into()))
1111 }
1112
1113 async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1114 Ok(Default::default())
1115 }
1116
1117 async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1118 Ok("".to_owned())
1119 }
1120
1121 async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1122 Ok(vec![])
1123 }
1124
1125 async fn get_tables(
1126 &self,
1127 _table_ids: Vec<crate::catalog::TableId>,
1128 _include_dropped_tables: bool,
1129 ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1130 Ok(HashMap::new())
1131 }
1132
1133 async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, u64)>> {
1134 unimplemented!()
1135 }
1136
1137 async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1138 unimplemented!()
1139 }
1140
1141 async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1142 Ok(HummockVersion::default())
1143 }
1144
1145 async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1146 unimplemented!()
1147 }
1148
1149 async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1150 unimplemented!()
1151 }
1152
1153 async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1154 unimplemented!()
1155 }
1156
1157 async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1158 unimplemented!()
1159 }
1160
1161 async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1162 unimplemented!()
1163 }
1164
1165 async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1166 unimplemented!()
1167 }
1168
1169 async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1170 Ok(vec![])
1171 }
1172
1173 async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1174 unimplemented!()
1175 }
1176
1177 async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1178 Ok(vec![])
1179 }
1180
1181 async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1182 unimplemented!()
1183 }
1184
1185 async fn recover(&self) -> RpcResult<()> {
1186 unimplemented!()
1187 }
1188
1189 async fn apply_throttle(
1190 &self,
1191 _kind: PbThrottleTarget,
1192 _id: u32,
1193 _rate_limit: Option<u32>,
1194 ) -> RpcResult<()> {
1195 unimplemented!()
1196 }
1197
1198 async fn alter_fragment_parallelism(
1199 &self,
1200 _fragment_ids: Vec<FragmentId>,
1201 _parallelism: Option<PbTableParallelism>,
1202 ) -> RpcResult<()> {
1203 unimplemented!()
1204 }
1205
1206 async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1207 Ok(RecoveryStatus::StatusRunning)
1208 }
1209
1210 async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1211 Ok(vec![])
1212 }
1213
1214 async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1215 Ok(vec![])
1216 }
1217
1218 async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1219 Ok(HashMap::default())
1220 }
1221
1222 async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1223 unimplemented!()
1224 }
1225
1226 async fn alter_sink_props(
1227 &self,
1228 _sink_id: SinkId,
1229 _changed_props: BTreeMap<String, String>,
1230 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1231 _connector_conn_ref: Option<ConnectionId>,
1232 ) -> RpcResult<()> {
1233 unimplemented!()
1234 }
1235
1236 async fn alter_iceberg_table_props(
1237 &self,
1238 _table_id: TableId,
1239 _sink_id: SinkId,
1240 _source_id: SourceId,
1241 _changed_props: BTreeMap<String, String>,
1242 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1243 _connector_conn_ref: Option<ConnectionId>,
1244 ) -> RpcResult<()> {
1245 unimplemented!()
1246 }
1247
1248 async fn alter_source_connector_props(
1249 &self,
1250 _source_id: SourceId,
1251 _changed_props: BTreeMap<String, String>,
1252 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1253 _connector_conn_ref: Option<ConnectionId>,
1254 ) -> RpcResult<()> {
1255 unimplemented!()
1256 }
1257
1258 async fn alter_connection_connector_props(
1259 &self,
1260 _connection_id: u32,
1261 _changed_props: BTreeMap<String, String>,
1262 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1263 ) -> RpcResult<()> {
1264 Ok(())
1265 }
1266
1267 async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1268 unimplemented!()
1269 }
1270
1271 async fn get_fragment_by_id(
1272 &self,
1273 _fragment_id: FragmentId,
1274 ) -> RpcResult<Option<FragmentDistribution>> {
1275 unimplemented!()
1276 }
1277
1278 async fn get_fragment_vnodes(
1279 &self,
1280 _fragment_id: FragmentId,
1281 ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1282 unimplemented!()
1283 }
1284
1285 async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1286 unimplemented!()
1287 }
1288
1289 fn worker_id(&self) -> WorkerId {
1290 0.into()
1291 }
1292
1293 async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1294 Ok(())
1295 }
1296
1297 async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1298 Ok(1)
1299 }
1300
1301 async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1302 Ok(())
1303 }
1304
1305 async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1306 Ok(RefreshResponse { status: None })
1307 }
1308
1309 fn cluster_id(&self) -> &str {
1310 "test-cluster-uuid"
1311 }
1312
1313 async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1314 unimplemented!()
1315 }
1316}
1317
1318#[cfg(test)]
1319pub static PROTO_FILE_DATA: &str = r#"
1320 syntax = "proto3";
1321 package test;
1322 message TestRecord {
1323 int32 id = 1;
1324 Country country = 3;
1325 int64 zipcode = 4;
1326 float rate = 5;
1327 }
1328 message TestRecordAlterType {
1329 string id = 1;
1330 Country country = 3;
1331 int32 zipcode = 4;
1332 float rate = 5;
1333 }
1334 message TestRecordExt {
1335 int32 id = 1;
1336 Country country = 3;
1337 int64 zipcode = 4;
1338 float rate = 5;
1339 string name = 6;
1340 }
1341 message Country {
1342 string address = 1;
1343 City city = 2;
1344 string zipcode = 3;
1345 }
1346 message City {
1347 string address = 1;
1348 string zipcode = 2;
1349 }"#;
1350
1351pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1354 let in_file = Builder::new()
1355 .prefix("temp")
1356 .suffix(".proto")
1357 .rand_bytes(8)
1358 .tempfile()
1359 .unwrap();
1360
1361 let out_file = Builder::new()
1362 .prefix("temp")
1363 .suffix(".pb")
1364 .rand_bytes(8)
1365 .tempfile()
1366 .unwrap();
1367
1368 let mut file = in_file.as_file();
1369 file.write_all(proto_data.as_ref())
1370 .expect("writing binary to test file");
1371 file.flush().expect("flush temp file failed");
1372 let include_path = in_file
1373 .path()
1374 .parent()
1375 .unwrap()
1376 .to_string_lossy()
1377 .into_owned();
1378 let out_path = out_file.path().to_string_lossy().into_owned();
1379 let in_path = in_file.path().to_string_lossy().into_owned();
1380 let mut compile = std::process::Command::new("protoc");
1381
1382 let out = compile
1383 .arg("--include_imports")
1384 .arg("-I")
1385 .arg(include_path)
1386 .arg(format!("--descriptor_set_out={}", out_path))
1387 .arg(in_path)
1388 .output()
1389 .expect("failed to compile proto");
1390 if !out.status.success() {
1391 panic!("compile proto failed \n output: {:?}", out);
1392 }
1393 out_file
1394}