1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28 AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29 DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30 FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31 RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::util::cluster_limit::ClusterLimit;
38use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
39use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
40use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
41use risingwave_pb::backup_service::MetaSnapshotMetadata;
42use risingwave_pb::catalog::{
43 PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44 PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
49use risingwave_pb::ddl_service::{
50 DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
51 alter_swap_rename_request, create_connection_request,
52};
53use risingwave_pb::hummock::write_limits::WriteLimit;
54use risingwave_pb::hummock::{
55 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
56};
57use risingwave_pb::id::ActorId;
58use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
59use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
60use risingwave_pb::meta::list_actor_states_response::ActorState;
61use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
62use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
63use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
64use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
65use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
66use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
67use risingwave_pb::meta::{
68 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
69 RefreshRequest, RefreshResponse, SystemParams,
70};
71use risingwave_pb::secret::PbSecretRef;
72use risingwave_pb::stream_plan::StreamFragmentGraph;
73use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
74use risingwave_pb::user::update_user_request::UpdateField;
75use risingwave_pb::user::{GrantPrivilege, UserInfo};
76use risingwave_rpc_client::error::Result as RpcResult;
77use tempfile::{Builder, NamedTempFile};
78
79use crate::FrontendOpts;
80use crate::catalog::catalog_service::CatalogWriter;
81use crate::catalog::root_catalog::Catalog;
82use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
83use crate::error::{ErrorCode, Result};
84use crate::handler::RwPgResponse;
85use crate::meta_client::FrontendMetaClient;
86use crate::scheduler::HummockSnapshotManagerRef;
87use crate::session::{AuthContext, FrontendEnv, SessionImpl};
88use crate::user::UserId;
89use crate::user::user_manager::UserInfoManager;
90use crate::user::user_service::UserInfoWriter;
91
92pub struct LocalFrontend {
94 pub opts: FrontendOpts,
95 env: FrontendEnv,
96}
97
98impl SessionManager for LocalFrontend {
99 type Session = SessionImpl;
100
101 fn create_dummy_session(
102 &self,
103 _database_id: DatabaseId,
104 _user_name: u32,
105 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
106 unreachable!()
107 }
108
109 fn connect(
110 &self,
111 _database: &str,
112 _user_name: &str,
113 _peer_addr: AddressRef,
114 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
115 Ok(self.session_ref())
116 }
117
118 fn cancel_queries_in_session(&self, _session_id: SessionId) {
119 unreachable!()
120 }
121
122 fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
123 unreachable!()
124 }
125
126 fn end_session(&self, _session: &Self::Session) {
127 unreachable!()
128 }
129}
130
131impl LocalFrontend {
132 #[expect(clippy::unused_async)]
133 pub async fn new(opts: FrontendOpts) -> Self {
134 let env = FrontendEnv::mock();
135 Self { opts, env }
136 }
137
138 pub async fn run_sql(
139 &self,
140 sql: impl Into<String>,
141 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
142 let sql: Arc<str> = Arc::from(sql.into());
143 self.session_ref().run_statement(sql, vec![]).await
144 }
145
146 pub async fn run_sql_with_session(
147 &self,
148 session_ref: Arc<SessionImpl>,
149 sql: impl Into<String>,
150 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
151 let sql: Arc<str> = Arc::from(sql.into());
152 session_ref.run_statement(sql, vec![]).await
153 }
154
155 pub async fn run_user_sql(
156 &self,
157 sql: impl Into<String>,
158 database: String,
159 user_name: String,
160 user_id: UserId,
161 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
162 let sql: Arc<str> = Arc::from(sql.into());
163 self.session_user_ref(database, user_name, user_id)
164 .run_statement(sql, vec![])
165 .await
166 }
167
168 pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
169 let mut rsp = self.run_sql(sql).await.unwrap();
170 let mut res = vec![];
171 #[for_await]
172 for row_set in rsp.values_stream() {
173 for row in row_set.unwrap() {
174 res.push(format!("{:?}", row));
175 }
176 }
177 res
178 }
179
180 pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
181 let mut rsp = self.run_sql(sql).await.unwrap();
182 assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
183 let mut res = String::new();
184 #[for_await]
185 for row_set in rsp.values_stream() {
186 for row in row_set.unwrap() {
187 let row: Row = row;
188 let row = row.values()[0].as_ref().unwrap();
189 res += std::str::from_utf8(row).unwrap();
190 res += "\n";
191 }
192 }
193 res
194 }
195
196 pub fn session_ref(&self) -> Arc<SessionImpl> {
198 self.session_user_ref(
199 DEFAULT_DATABASE_NAME.to_owned(),
200 DEFAULT_SUPER_USER.to_owned(),
201 DEFAULT_SUPER_USER_ID,
202 )
203 }
204
205 pub fn session_user_ref(
206 &self,
207 database: String,
208 user_name: String,
209 user_id: UserId,
210 ) -> Arc<SessionImpl> {
211 Arc::new(SessionImpl::new(
212 self.env.clone(),
213 AuthContext::new(database, user_name, user_id),
214 UserAuthenticator::None,
215 (0, 0),
217 Address::Tcp(SocketAddr::new(
218 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
219 6666,
220 ))
221 .into(),
222 Default::default(),
223 ))
224 }
225}
226
227pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
228 if rsp.stmt_type() != StatementType::EXPLAIN {
229 panic!("RESPONSE INVALID: {rsp:?}");
230 }
231 let mut res = String::new();
232 #[for_await]
233 for row_set in rsp.values_stream() {
234 for row in row_set.unwrap() {
235 let row: Row = row;
236 let row = row.values()[0].as_ref().unwrap();
237 res += std::str::from_utf8(row).unwrap();
238 res += "\n";
239 }
240 }
241 res
242}
243
244pub struct MockCatalogWriter {
245 catalog: Arc<RwLock<Catalog>>,
246 id: AtomicU32,
247 table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
248 schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
249 hummock_snapshot_manager: HummockSnapshotManagerRef,
250}
251
252#[async_trait::async_trait]
253impl CatalogWriter for MockCatalogWriter {
254 async fn create_database(
255 &self,
256 db_name: &str,
257 owner: UserId,
258 resource_group: &str,
259 barrier_interval_ms: Option<u32>,
260 checkpoint_frequency: Option<u64>,
261 ) -> Result<()> {
262 let database_id = DatabaseId::new(self.gen_id());
263 self.catalog.write().create_database(&PbDatabase {
264 name: db_name.to_owned(),
265 id: database_id,
266 owner,
267 resource_group: resource_group.to_owned(),
268 barrier_interval_ms,
269 checkpoint_frequency,
270 });
271 self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
272 .await?;
273 self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
274 .await?;
275 self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
276 .await?;
277 Ok(())
278 }
279
280 async fn create_schema(
281 &self,
282 db_id: DatabaseId,
283 schema_name: &str,
284 owner: UserId,
285 ) -> Result<()> {
286 let id = self.gen_id();
287 self.catalog.write().create_schema(&PbSchema {
288 id,
289 name: schema_name.to_owned(),
290 database_id: db_id,
291 owner,
292 });
293 self.add_schema_id(id, db_id);
294 Ok(())
295 }
296
297 async fn create_materialized_view(
298 &self,
299 mut table: PbTable,
300 _graph: StreamFragmentGraph,
301 _dependencies: HashSet<ObjectId>,
302 _specific_resource_group: Option<String>,
303 _if_not_exists: bool,
304 ) -> Result<()> {
305 table.id = self.gen_id();
306 table.stream_job_status = PbStreamJobStatus::Created as _;
307 table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
308 self.catalog.write().create_table(&table);
309 self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
310 self.hummock_snapshot_manager.add_table_for_test(table.id);
311 Ok(())
312 }
313
314 async fn replace_materialized_view(
315 &self,
316 mut table: PbTable,
317 _graph: StreamFragmentGraph,
318 ) -> Result<()> {
319 table.stream_job_status = PbStreamJobStatus::Created as _;
320 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
321 self.catalog.write().update_table(&table);
322 Ok(())
323 }
324
325 async fn create_view(&self, mut view: PbView, _dependencies: HashSet<ObjectId>) -> Result<()> {
326 view.id = self.gen_id();
327 self.catalog.write().create_view(&view);
328 self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
329 Ok(())
330 }
331
332 async fn create_table(
333 &self,
334 source: Option<PbSource>,
335 mut table: PbTable,
336 graph: StreamFragmentGraph,
337 _job_type: PbTableJobType,
338 if_not_exists: bool,
339 _dependencies: HashSet<ObjectId>,
340 ) -> Result<()> {
341 if let Some(source) = source {
342 let source_id = self.create_source_inner(source)?;
343 table.optional_associated_source_id = Some(source_id.into());
344 }
345 self.create_materialized_view(table, graph, HashSet::new(), None, if_not_exists)
346 .await?;
347 Ok(())
348 }
349
350 async fn replace_table(
351 &self,
352 _source: Option<PbSource>,
353 mut table: PbTable,
354 _graph: StreamFragmentGraph,
355 _job_type: TableJobType,
356 ) -> Result<()> {
357 table.stream_job_status = PbStreamJobStatus::Created as _;
358 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
359 self.catalog.write().update_table(&table);
360 Ok(())
361 }
362
363 async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
364 self.catalog.write().update_source(&source);
365 Ok(())
366 }
367
368 async fn create_source(
369 &self,
370 source: PbSource,
371 _graph: Option<StreamFragmentGraph>,
372 _if_not_exists: bool,
373 ) -> Result<()> {
374 self.create_source_inner(source).map(|_| ())
375 }
376
377 async fn create_sink(
378 &self,
379 sink: PbSink,
380 graph: StreamFragmentGraph,
381 _dependencies: HashSet<ObjectId>,
382 _if_not_exists: bool,
383 ) -> Result<()> {
384 self.create_sink_inner(sink, graph)
385 }
386
387 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
388 self.create_subscription_inner(subscription)
389 }
390
391 async fn create_index(
392 &self,
393 mut index: PbIndex,
394 mut index_table: PbTable,
395 _graph: StreamFragmentGraph,
396 _if_not_exists: bool,
397 ) -> Result<()> {
398 index_table.id = self.gen_id();
399 index_table.stream_job_status = PbStreamJobStatus::Created as _;
400 index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
401 self.catalog.write().create_table(&index_table);
402 self.add_table_or_index_id(
403 index_table.id.as_raw_id(),
404 index_table.schema_id,
405 index_table.database_id,
406 );
407
408 index.id = index_table.id.as_raw_id().into();
409 index.index_table_id = index_table.id;
410 self.catalog.write().create_index(&index);
411 Ok(())
412 }
413
414 async fn create_function(&self, _function: PbFunction) -> Result<()> {
415 unreachable!()
416 }
417
418 async fn create_connection(
419 &self,
420 _connection_name: String,
421 _database_id: DatabaseId,
422 _schema_id: SchemaId,
423 _owner_id: u32,
424 _connection: create_connection_request::Payload,
425 ) -> Result<()> {
426 unreachable!()
427 }
428
429 async fn create_secret(
430 &self,
431 _secret_name: String,
432 _database_id: DatabaseId,
433 _schema_id: SchemaId,
434 _owner_id: u32,
435 _payload: Vec<u8>,
436 ) -> Result<()> {
437 unreachable!()
438 }
439
440 async fn comment_on(&self, _comment: PbComment) -> Result<()> {
441 unreachable!()
442 }
443
444 async fn drop_table(
445 &self,
446 source_id: Option<u32>,
447 table_id: TableId,
448 cascade: bool,
449 ) -> Result<()> {
450 if cascade {
451 return Err(ErrorCode::NotSupported(
452 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
453 "use drop instead".to_owned(),
454 )
455 .into());
456 }
457 if let Some(source_id) = source_id {
458 self.drop_table_or_source_id(source_id);
459 }
460 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
461 let indexes =
462 self.catalog
463 .read()
464 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
465 for index in indexes {
466 self.drop_index(index.id, cascade).await?;
467 }
468 self.catalog
469 .write()
470 .drop_table(database_id, schema_id, table_id);
471 if let Some(source_id) = source_id {
472 self.catalog
473 .write()
474 .drop_source(database_id, schema_id, source_id.into());
475 }
476 Ok(())
477 }
478
479 async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
480 unreachable!()
481 }
482
483 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
484 if cascade {
485 return Err(ErrorCode::NotSupported(
486 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
487 "use drop instead".to_owned(),
488 )
489 .into());
490 }
491 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
492 let indexes =
493 self.catalog
494 .read()
495 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
496 for index in indexes {
497 self.drop_index(index.id, cascade).await?;
498 }
499 self.catalog
500 .write()
501 .drop_table(database_id, schema_id, table_id);
502 Ok(())
503 }
504
505 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
506 if cascade {
507 return Err(ErrorCode::NotSupported(
508 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
509 "use drop instead".to_owned(),
510 )
511 .into());
512 }
513 let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
514 self.catalog
515 .write()
516 .drop_source(database_id, schema_id, source_id);
517 Ok(())
518 }
519
520 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
521 if cascade {
522 return Err(ErrorCode::NotSupported(
523 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
524 "use drop instead".to_owned(),
525 )
526 .into());
527 }
528 let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
529 self.catalog
530 .write()
531 .drop_sink(database_id, schema_id, sink_id);
532 Ok(())
533 }
534
535 async fn drop_subscription(
536 &self,
537 subscription_id: SubscriptionId,
538 cascade: bool,
539 ) -> Result<()> {
540 if cascade {
541 return Err(ErrorCode::NotSupported(
542 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
543 "use drop instead".to_owned(),
544 )
545 .into());
546 }
547 let (database_id, schema_id) =
548 self.drop_table_or_subscription_id(subscription_id.as_raw_id());
549 self.catalog
550 .write()
551 .drop_subscription(database_id, schema_id, subscription_id);
552 Ok(())
553 }
554
555 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
556 if cascade {
557 return Err(ErrorCode::NotSupported(
558 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
559 "use drop instead".to_owned(),
560 )
561 .into());
562 }
563 let &schema_id = self
564 .table_id_to_schema_id
565 .read()
566 .get(&index_id.as_raw_id())
567 .unwrap();
568 let database_id = self.get_database_id_by_schema(schema_id);
569
570 let index = {
571 let catalog_reader = self.catalog.read();
572 let schema_catalog = catalog_reader
573 .get_schema_by_id(database_id, schema_id)
574 .unwrap();
575 schema_catalog.get_index_by_id(index_id).unwrap().clone()
576 };
577
578 let index_table_id = index.index_table().id;
579 let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
580 self.catalog
581 .write()
582 .drop_index(database_id, schema_id, index_id);
583 self.catalog
584 .write()
585 .drop_table(database_id, schema_id, index_table_id);
586 Ok(())
587 }
588
589 async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
590 unreachable!()
591 }
592
593 async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
594 unreachable!()
595 }
596
597 async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
598 unreachable!()
599 }
600
601 async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
602 self.catalog.write().drop_database(database_id);
603 Ok(())
604 }
605
606 async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
607 let database_id = self.drop_schema_id(schema_id);
608 self.catalog.write().drop_schema(database_id, schema_id);
609 Ok(())
610 }
611
612 async fn alter_name(
613 &self,
614 object_id: alter_name_request::Object,
615 object_name: &str,
616 ) -> Result<()> {
617 match object_id {
618 alter_name_request::Object::TableId(table_id) => {
619 self.catalog
620 .write()
621 .alter_table_name_by_id(table_id.into(), object_name);
622 Ok(())
623 }
624 _ => {
625 unimplemented!()
626 }
627 }
628 }
629
630 async fn alter_source(&self, source: PbSource) -> Result<()> {
631 self.catalog.write().update_source(&source);
632 Ok(())
633 }
634
635 async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
636 for database in self.catalog.read().iter_databases() {
637 for schema in database.iter_schemas() {
638 match object {
639 Object::TableId(table_id) => {
640 if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
641 {
642 let mut pb_table = table.to_prost();
643 pb_table.owner = owner_id;
644 self.catalog.write().update_table(&pb_table);
645 return Ok(());
646 }
647 }
648 _ => unreachable!(),
649 }
650 }
651 }
652
653 Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
654 }
655
656 async fn alter_set_schema(
657 &self,
658 object: alter_set_schema_request::Object,
659 new_schema_id: SchemaId,
660 ) -> Result<()> {
661 match object {
662 alter_set_schema_request::Object::TableId(table_id) => {
663 let mut pb_table = {
664 let reader = self.catalog.read();
665 let table = reader.get_any_table_by_id(table_id.into())?.to_owned();
666 table.to_prost()
667 };
668 pb_table.schema_id = new_schema_id;
669 self.catalog.write().update_table(&pb_table);
670 self.table_id_to_schema_id
671 .write()
672 .insert(table_id, new_schema_id);
673 Ok(())
674 }
675 _ => unreachable!(),
676 }
677 }
678
679 async fn alter_parallelism(
680 &self,
681 _job_id: JobId,
682 _parallelism: PbTableParallelism,
683 _deferred: bool,
684 ) -> Result<()> {
685 todo!()
686 }
687
688 async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
689 todo!()
690 }
691
692 async fn alter_secret(
693 &self,
694 _secret_id: SecretId,
695 _secret_name: String,
696 _database_id: DatabaseId,
697 _schema_id: SchemaId,
698 _owner_id: u32,
699 _payload: Vec<u8>,
700 ) -> Result<()> {
701 unreachable!()
702 }
703
704 async fn alter_resource_group(
705 &self,
706 _table_id: TableId,
707 _resource_group: Option<String>,
708 _deferred: bool,
709 ) -> Result<()> {
710 todo!()
711 }
712
713 async fn alter_database_param(
714 &self,
715 database_id: DatabaseId,
716 param: AlterDatabaseParam,
717 ) -> Result<()> {
718 let mut pb_database = {
719 let reader = self.catalog.read();
720 let database = reader.get_database_by_id(database_id)?.to_owned();
721 database.to_prost()
722 };
723 match param {
724 AlterDatabaseParam::BarrierIntervalMs(interval) => {
725 pb_database.barrier_interval_ms = interval;
726 }
727 AlterDatabaseParam::CheckpointFrequency(frequency) => {
728 pb_database.checkpoint_frequency = frequency;
729 }
730 }
731 self.catalog.write().update_database(&pb_database);
732 Ok(())
733 }
734
735 async fn create_iceberg_table(
736 &self,
737 _table_job_info: PbTableJobInfo,
738 _sink_job_info: PbSinkJobInfo,
739 _iceberg_source: PbSource,
740 _if_not_exists: bool,
741 ) -> Result<()> {
742 todo!()
743 }
744}
745
746impl MockCatalogWriter {
747 pub fn new(
748 catalog: Arc<RwLock<Catalog>>,
749 hummock_snapshot_manager: HummockSnapshotManagerRef,
750 ) -> Self {
751 catalog.write().create_database(&PbDatabase {
752 id: 0.into(),
753 name: DEFAULT_DATABASE_NAME.to_owned(),
754 owner: DEFAULT_SUPER_USER_ID,
755 resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
756 barrier_interval_ms: None,
757 checkpoint_frequency: None,
758 });
759 catalog.write().create_schema(&PbSchema {
760 id: 1.into(),
761 name: DEFAULT_SCHEMA_NAME.to_owned(),
762 database_id: 0.into(),
763 owner: DEFAULT_SUPER_USER_ID,
764 });
765 catalog.write().create_schema(&PbSchema {
766 id: 2.into(),
767 name: PG_CATALOG_SCHEMA_NAME.to_owned(),
768 database_id: 0.into(),
769 owner: DEFAULT_SUPER_USER_ID,
770 });
771 catalog.write().create_schema(&PbSchema {
772 id: 3.into(),
773 name: RW_CATALOG_SCHEMA_NAME.to_owned(),
774 database_id: 0.into(),
775 owner: DEFAULT_SUPER_USER_ID,
776 });
777 let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
778 map.insert(1_u32.into(), 0_u32.into());
779 map.insert(2_u32.into(), 0_u32.into());
780 map.insert(3_u32.into(), 0_u32.into());
781 Self {
782 catalog,
783 id: AtomicU32::new(3),
784 table_id_to_schema_id: Default::default(),
785 schema_id_to_database_id: RwLock::new(map),
786 hummock_snapshot_manager,
787 }
788 }
789
790 fn gen_id<T: From<u32>>(&self) -> T {
791 (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
793 }
794
795 fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
796 self.table_id_to_schema_id
797 .write()
798 .insert(table_id, schema_id);
799 }
800
801 fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
802 let schema_id = self
803 .table_id_to_schema_id
804 .write()
805 .remove(&table_id)
806 .unwrap();
807 (self.get_database_id_by_schema(schema_id), schema_id)
808 }
809
810 fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
811 self.table_id_to_schema_id
812 .write()
813 .insert(table_id, schema_id);
814 }
815
816 fn add_table_or_subscription_id(
817 &self,
818 table_id: u32,
819 schema_id: SchemaId,
820 _database_id: DatabaseId,
821 ) {
822 self.table_id_to_schema_id
823 .write()
824 .insert(table_id, schema_id);
825 }
826
827 fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
828 self.table_id_to_schema_id
829 .write()
830 .insert(table_id, schema_id);
831 }
832
833 fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
834 let schema_id = self
835 .table_id_to_schema_id
836 .write()
837 .remove(&table_id)
838 .unwrap();
839 (self.get_database_id_by_schema(schema_id), schema_id)
840 }
841
842 fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
843 let schema_id = self
844 .table_id_to_schema_id
845 .write()
846 .remove(&table_id)
847 .unwrap();
848 (self.get_database_id_by_schema(schema_id), schema_id)
849 }
850
851 fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
852 let schema_id = self
853 .table_id_to_schema_id
854 .write()
855 .remove(&table_id)
856 .unwrap();
857 (self.get_database_id_by_schema(schema_id), schema_id)
858 }
859
860 fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
861 self.schema_id_to_database_id
862 .write()
863 .insert(schema_id, database_id);
864 }
865
866 fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
867 self.schema_id_to_database_id
868 .write()
869 .remove(&schema_id)
870 .unwrap()
871 }
872
873 fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
874 source.id = self.gen_id();
875 self.catalog.write().create_source(&source);
876 self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
877 Ok(source.id)
878 }
879
880 fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
881 sink.id = self.gen_id();
882 sink.stream_job_status = PbStreamJobStatus::Created as _;
883 self.catalog.write().create_sink(&sink);
884 self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
885 Ok(())
886 }
887
888 fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
889 subscription.id = self.gen_id();
890 self.catalog.write().create_subscription(&subscription);
891 self.add_table_or_subscription_id(
892 subscription.id.as_raw_id(),
893 subscription.schema_id,
894 subscription.database_id,
895 );
896 Ok(())
897 }
898
899 fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
900 *self
901 .schema_id_to_database_id
902 .read()
903 .get(&schema_id)
904 .unwrap()
905 }
906}
907
908pub struct MockUserInfoWriter {
909 id: AtomicU32,
910 user_info: Arc<RwLock<UserInfoManager>>,
911}
912
913#[async_trait::async_trait]
914impl UserInfoWriter for MockUserInfoWriter {
915 async fn create_user(&self, user: UserInfo) -> Result<()> {
916 let mut user = user;
917 user.id = self.gen_id();
918 self.user_info.write().create_user(user);
919 Ok(())
920 }
921
922 async fn drop_user(&self, id: UserId) -> Result<()> {
923 self.user_info.write().drop_user(id);
924 Ok(())
925 }
926
927 async fn update_user(
928 &self,
929 update_user: UserInfo,
930 update_fields: Vec<UpdateField>,
931 ) -> Result<()> {
932 let mut lock = self.user_info.write();
933 let id = update_user.get_id();
934 let Some(old_name) = lock.get_user_name_by_id(id) else {
935 return Ok(());
936 };
937 let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
938 update_fields.into_iter().for_each(|field| match field {
939 UpdateField::Super => user_info.is_super = update_user.is_super,
940 UpdateField::Login => user_info.can_login = update_user.can_login,
941 UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
942 UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
943 UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
944 UpdateField::Rename => user_info.name.clone_from(&update_user.name),
945 UpdateField::Admin => user_info.is_admin = update_user.is_admin,
946 UpdateField::Unspecified => unreachable!(),
947 });
948 lock.update_user(update_user);
949 Ok(())
950 }
951
952 async fn grant_privilege(
955 &self,
956 users: Vec<UserId>,
957 privileges: Vec<GrantPrivilege>,
958 with_grant_option: bool,
959 _grantor: UserId,
960 ) -> Result<()> {
961 let privileges = privileges
962 .into_iter()
963 .map(|mut p| {
964 p.action_with_opts
965 .iter_mut()
966 .for_each(|ao| ao.with_grant_option = with_grant_option);
967 p
968 })
969 .collect::<Vec<_>>();
970 for user_id in users {
971 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
972 u.extend_privileges(privileges.clone());
973 }
974 }
975 Ok(())
976 }
977
978 async fn revoke_privilege(
981 &self,
982 users: Vec<UserId>,
983 privileges: Vec<GrantPrivilege>,
984 _granted_by: UserId,
985 _revoke_by: UserId,
986 revoke_grant_option: bool,
987 _cascade: bool,
988 ) -> Result<()> {
989 for user_id in users {
990 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
991 u.revoke_privileges(privileges.clone(), revoke_grant_option);
992 }
993 }
994 Ok(())
995 }
996
997 async fn alter_default_privilege(
998 &self,
999 _users: Vec<UserId>,
1000 _database_id: DatabaseId,
1001 _schemas: Vec<SchemaId>,
1002 _operation: AlterDefaultPrivilegeOperation,
1003 _operated_by: UserId,
1004 ) -> Result<()> {
1005 todo!()
1006 }
1007}
1008
1009impl MockUserInfoWriter {
1010 pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1011 user_info.write().create_user(UserInfo {
1012 id: DEFAULT_SUPER_USER_ID,
1013 name: DEFAULT_SUPER_USER.to_owned(),
1014 is_super: true,
1015 can_create_db: true,
1016 can_create_user: true,
1017 can_login: true,
1018 ..Default::default()
1019 });
1020 user_info.write().create_user(UserInfo {
1021 id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1022 name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1023 is_super: true,
1024 can_create_db: true,
1025 can_create_user: true,
1026 can_login: true,
1027 is_admin: true,
1028 ..Default::default()
1029 });
1030 Self {
1031 user_info,
1032 id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
1033 }
1034 }
1035
1036 fn gen_id(&self) -> u32 {
1037 self.id.fetch_add(1, Ordering::SeqCst)
1038 }
1039}
1040
1041pub struct MockFrontendMetaClient {}
1042
1043#[async_trait::async_trait]
1044impl FrontendMetaClient for MockFrontendMetaClient {
1045 async fn try_unregister(&self) {}
1046
1047 async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1048 Ok(INVALID_VERSION_ID)
1049 }
1050
1051 async fn wait(&self) -> RpcResult<()> {
1052 Ok(())
1053 }
1054
1055 async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1056 Ok(vec![])
1057 }
1058
1059 async fn list_table_fragments(
1060 &self,
1061 _table_ids: &[JobId],
1062 ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1063 Ok(HashMap::default())
1064 }
1065
1066 async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1067 Ok(vec![])
1068 }
1069
1070 async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1071 Ok(vec![])
1072 }
1073
1074 async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1075 Ok(vec![])
1076 }
1077
1078 async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1079 Ok(vec![])
1080 }
1081
1082 async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1083 Ok(vec![])
1084 }
1085
1086 async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1087 Ok(vec![])
1088 }
1089
1090 async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1091 Ok(vec![])
1092 }
1093
1094 async fn set_system_param(
1095 &self,
1096 _param: String,
1097 _value: Option<String>,
1098 ) -> RpcResult<Option<SystemParamsReader>> {
1099 Ok(Some(SystemParams::default().into()))
1100 }
1101
1102 async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1103 Ok(Default::default())
1104 }
1105
1106 async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1107 Ok("".to_owned())
1108 }
1109
1110 async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1111 Ok(vec![])
1112 }
1113
1114 async fn get_tables(
1115 &self,
1116 _table_ids: Vec<crate::catalog::TableId>,
1117 _include_dropped_tables: bool,
1118 ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1119 Ok(HashMap::new())
1120 }
1121
1122 async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, u64)>> {
1123 unimplemented!()
1124 }
1125
1126 async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1127 Ok(HummockVersion::default())
1128 }
1129
1130 async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1131 unimplemented!()
1132 }
1133
1134 async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1135 unimplemented!()
1136 }
1137
1138 async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1139 unimplemented!()
1140 }
1141
1142 async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1143 unimplemented!()
1144 }
1145
1146 async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1147 unimplemented!()
1148 }
1149
1150 async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1151 unimplemented!()
1152 }
1153
1154 async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1155 unimplemented!()
1156 }
1157
1158 async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1159 unimplemented!()
1160 }
1161
1162 async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1163 Ok(vec![])
1164 }
1165
1166 async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1167 unimplemented!()
1168 }
1169
1170 async fn recover(&self) -> RpcResult<()> {
1171 unimplemented!()
1172 }
1173
1174 async fn apply_throttle(
1175 &self,
1176 _kind: PbThrottleTarget,
1177 _id: u32,
1178 _rate_limit: Option<u32>,
1179 ) -> RpcResult<()> {
1180 unimplemented!()
1181 }
1182
1183 async fn alter_fragment_parallelism(
1184 &self,
1185 _fragment_ids: Vec<FragmentId>,
1186 _parallelism: Option<PbTableParallelism>,
1187 ) -> RpcResult<()> {
1188 unimplemented!()
1189 }
1190
1191 async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1192 Ok(RecoveryStatus::StatusRunning)
1193 }
1194
1195 async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1196 Ok(vec![])
1197 }
1198
1199 async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1200 Ok(vec![])
1201 }
1202
1203 async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1204 Ok(HashMap::default())
1205 }
1206
1207 async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1208 unimplemented!()
1209 }
1210
1211 async fn alter_sink_props(
1212 &self,
1213 _sink_id: SinkId,
1214 _changed_props: BTreeMap<String, String>,
1215 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1216 _connector_conn_ref: Option<ConnectionId>,
1217 ) -> RpcResult<()> {
1218 unimplemented!()
1219 }
1220
1221 async fn alter_iceberg_table_props(
1222 &self,
1223 _table_id: TableId,
1224 _sink_id: SinkId,
1225 _source_id: SourceId,
1226 _changed_props: BTreeMap<String, String>,
1227 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1228 _connector_conn_ref: Option<ConnectionId>,
1229 ) -> RpcResult<()> {
1230 unimplemented!()
1231 }
1232
1233 async fn alter_source_connector_props(
1234 &self,
1235 _source_id: SourceId,
1236 _changed_props: BTreeMap<String, String>,
1237 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1238 _connector_conn_ref: Option<ConnectionId>,
1239 ) -> RpcResult<()> {
1240 unimplemented!()
1241 }
1242
1243 async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1244 unimplemented!()
1245 }
1246
1247 async fn get_fragment_by_id(
1248 &self,
1249 _fragment_id: FragmentId,
1250 ) -> RpcResult<Option<FragmentDistribution>> {
1251 unimplemented!()
1252 }
1253
1254 async fn get_fragment_vnodes(
1255 &self,
1256 _fragment_id: FragmentId,
1257 ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1258 unimplemented!()
1259 }
1260
1261 async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1262 unimplemented!()
1263 }
1264
1265 fn worker_id(&self) -> WorkerId {
1266 0.into()
1267 }
1268
1269 async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1270 Ok(())
1271 }
1272
1273 async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1274 Ok(1)
1275 }
1276
1277 async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1278 Ok(())
1279 }
1280
1281 async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1282 Ok(RefreshResponse { status: None })
1283 }
1284
1285 fn cluster_id(&self) -> &str {
1286 "test-cluster-uuid"
1287 }
1288
1289 async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1290 unimplemented!()
1291 }
1292}
1293
1294#[cfg(test)]
1295pub static PROTO_FILE_DATA: &str = r#"
1296 syntax = "proto3";
1297 package test;
1298 message TestRecord {
1299 int32 id = 1;
1300 Country country = 3;
1301 int64 zipcode = 4;
1302 float rate = 5;
1303 }
1304 message TestRecordAlterType {
1305 string id = 1;
1306 Country country = 3;
1307 int32 zipcode = 4;
1308 float rate = 5;
1309 }
1310 message TestRecordExt {
1311 int32 id = 1;
1312 Country country = 3;
1313 int64 zipcode = 4;
1314 float rate = 5;
1315 string name = 6;
1316 }
1317 message Country {
1318 string address = 1;
1319 City city = 2;
1320 string zipcode = 3;
1321 }
1322 message City {
1323 string address = 1;
1324 string zipcode = 2;
1325 }"#;
1326
1327pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1330 let in_file = Builder::new()
1331 .prefix("temp")
1332 .suffix(".proto")
1333 .rand_bytes(8)
1334 .tempfile()
1335 .unwrap();
1336
1337 let out_file = Builder::new()
1338 .prefix("temp")
1339 .suffix(".pb")
1340 .rand_bytes(8)
1341 .tempfile()
1342 .unwrap();
1343
1344 let mut file = in_file.as_file();
1345 file.write_all(proto_data.as_ref())
1346 .expect("writing binary to test file");
1347 file.flush().expect("flush temp file failed");
1348 let include_path = in_file
1349 .path()
1350 .parent()
1351 .unwrap()
1352 .to_string_lossy()
1353 .into_owned();
1354 let out_path = out_file.path().to_string_lossy().into_owned();
1355 let in_path = in_file.path().to_string_lossy().into_owned();
1356 let mut compile = std::process::Command::new("protoc");
1357
1358 let out = compile
1359 .arg("--include_imports")
1360 .arg("-I")
1361 .arg(include_path)
1362 .arg(format!("--descriptor_set_out={}", out_path))
1363 .arg(in_path)
1364 .output()
1365 .expect("failed to compile proto");
1366 if !out.status.success() {
1367 panic!("compile proto failed \n output: {:?}", out);
1368 }
1369 out_file
1370}