1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{BoxedError, SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28 AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29 DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30 FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31 RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::session_config::SessionConfig;
35use risingwave_common::system_param::reader::SystemParamsReader;
36use risingwave_common::util::cluster_limit::ClusterLimit;
37use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
38use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
39use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
40use risingwave_pb::backup_service::MetaSnapshotMetadata;
41use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
42use risingwave_pb::catalog::{
43 PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44 PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
49use risingwave_pb::ddl_service::{
50 DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
51 alter_swap_rename_request, create_connection_request,
52};
53use risingwave_pb::hummock::write_limits::WriteLimit;
54use risingwave_pb::hummock::{
55 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
56};
57use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
58use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
59use risingwave_pb::meta::list_actor_states_response::ActorState;
60use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
61use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
62use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
63use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
64use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
65use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
66use risingwave_pb::meta::{
67 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
68 RefreshRequest, RefreshResponse, SystemParams,
69};
70use risingwave_pb::secret::PbSecretRef;
71use risingwave_pb::stream_plan::StreamFragmentGraph;
72use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
73use risingwave_pb::user::update_user_request::UpdateField;
74use risingwave_pb::user::{GrantPrivilege, UserInfo};
75use risingwave_rpc_client::error::Result as RpcResult;
76use tempfile::{Builder, NamedTempFile};
77
78use crate::FrontendOpts;
79use crate::catalog::catalog_service::CatalogWriter;
80use crate::catalog::root_catalog::Catalog;
81use crate::catalog::{DatabaseId, SchemaId, SecretId, SinkId};
82use crate::error::{ErrorCode, Result};
83use crate::handler::RwPgResponse;
84use crate::meta_client::FrontendMetaClient;
85use crate::scheduler::HummockSnapshotManagerRef;
86use crate::session::{AuthContext, FrontendEnv, SessionImpl};
87use crate::user::UserId;
88use crate::user::user_manager::UserInfoManager;
89use crate::user::user_service::UserInfoWriter;
90
91pub struct LocalFrontend {
93 pub opts: FrontendOpts,
94 env: FrontendEnv,
95}
96
97impl SessionManager for LocalFrontend {
98 type Session = SessionImpl;
99
100 fn create_dummy_session(
101 &self,
102 _database_id: u32,
103 _user_name: u32,
104 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
105 unreachable!()
106 }
107
108 fn connect(
109 &self,
110 _database: &str,
111 _user_name: &str,
112 _peer_addr: AddressRef,
113 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
114 Ok(self.session_ref())
115 }
116
117 fn cancel_queries_in_session(&self, _session_id: SessionId) {
118 unreachable!()
119 }
120
121 fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
122 unreachable!()
123 }
124
125 fn end_session(&self, _session: &Self::Session) {
126 unreachable!()
127 }
128}
129
130impl LocalFrontend {
131 #[expect(clippy::unused_async)]
132 pub async fn new(opts: FrontendOpts) -> Self {
133 let env = FrontendEnv::mock();
134 Self { opts, env }
135 }
136
137 pub async fn run_sql(
138 &self,
139 sql: impl Into<String>,
140 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
141 let sql: Arc<str> = Arc::from(sql.into());
142 self.session_ref().run_statement(sql, vec![]).await
143 }
144
145 pub async fn run_sql_with_session(
146 &self,
147 session_ref: Arc<SessionImpl>,
148 sql: impl Into<String>,
149 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
150 let sql: Arc<str> = Arc::from(sql.into());
151 session_ref.run_statement(sql, vec![]).await
152 }
153
154 pub async fn run_user_sql(
155 &self,
156 sql: impl Into<String>,
157 database: String,
158 user_name: String,
159 user_id: UserId,
160 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
161 let sql: Arc<str> = Arc::from(sql.into());
162 self.session_user_ref(database, user_name, user_id)
163 .run_statement(sql, vec![])
164 .await
165 }
166
167 pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
168 let mut rsp = self.run_sql(sql).await.unwrap();
169 let mut res = vec![];
170 #[for_await]
171 for row_set in rsp.values_stream() {
172 for row in row_set.unwrap() {
173 res.push(format!("{:?}", row));
174 }
175 }
176 res
177 }
178
179 pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
180 let mut rsp = self.run_sql(sql).await.unwrap();
181 assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
182 let mut res = String::new();
183 #[for_await]
184 for row_set in rsp.values_stream() {
185 for row in row_set.unwrap() {
186 let row: Row = row;
187 let row = row.values()[0].as_ref().unwrap();
188 res += std::str::from_utf8(row).unwrap();
189 res += "\n";
190 }
191 }
192 res
193 }
194
195 pub fn session_ref(&self) -> Arc<SessionImpl> {
197 self.session_user_ref(
198 DEFAULT_DATABASE_NAME.to_owned(),
199 DEFAULT_SUPER_USER.to_owned(),
200 DEFAULT_SUPER_USER_ID,
201 )
202 }
203
204 pub fn session_user_ref(
205 &self,
206 database: String,
207 user_name: String,
208 user_id: UserId,
209 ) -> Arc<SessionImpl> {
210 Arc::new(SessionImpl::new(
211 self.env.clone(),
212 AuthContext::new(database, user_name, user_id),
213 UserAuthenticator::None,
214 (0, 0),
216 Address::Tcp(SocketAddr::new(
217 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
218 6666,
219 ))
220 .into(),
221 Default::default(),
222 ))
223 }
224}
225
226pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
227 if rsp.stmt_type() != StatementType::EXPLAIN {
228 panic!("RESPONSE INVALID: {rsp:?}");
229 }
230 let mut res = String::new();
231 #[for_await]
232 for row_set in rsp.values_stream() {
233 for row in row_set.unwrap() {
234 let row: Row = row;
235 let row = row.values()[0].as_ref().unwrap();
236 res += std::str::from_utf8(row).unwrap();
237 res += "\n";
238 }
239 }
240 res
241}
242
243pub struct MockCatalogWriter {
244 catalog: Arc<RwLock<Catalog>>,
245 id: AtomicU32,
246 table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
247 schema_id_to_database_id: RwLock<HashMap<u32, DatabaseId>>,
248 hummock_snapshot_manager: HummockSnapshotManagerRef,
249}
250
251#[async_trait::async_trait]
252impl CatalogWriter for MockCatalogWriter {
253 async fn create_database(
254 &self,
255 db_name: &str,
256 owner: UserId,
257 resource_group: &str,
258 barrier_interval_ms: Option<u32>,
259 checkpoint_frequency: Option<u64>,
260 ) -> Result<()> {
261 let database_id = self.gen_id();
262 self.catalog.write().create_database(&PbDatabase {
263 name: db_name.to_owned(),
264 id: database_id,
265 owner,
266 resource_group: resource_group.to_owned(),
267 barrier_interval_ms,
268 checkpoint_frequency,
269 });
270 self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
271 .await?;
272 self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
273 .await?;
274 self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
275 .await?;
276 Ok(())
277 }
278
279 async fn create_schema(
280 &self,
281 db_id: DatabaseId,
282 schema_name: &str,
283 owner: UserId,
284 ) -> Result<()> {
285 let id = self.gen_id();
286 self.catalog.write().create_schema(&PbSchema {
287 id,
288 name: schema_name.to_owned(),
289 database_id: db_id,
290 owner,
291 });
292 self.add_schema_id(id, db_id);
293 Ok(())
294 }
295
296 async fn create_materialized_view(
297 &self,
298 mut table: PbTable,
299 _graph: StreamFragmentGraph,
300 _dependencies: HashSet<ObjectId>,
301 _specific_resource_group: Option<String>,
302 _if_not_exists: bool,
303 ) -> Result<()> {
304 table.id = self.gen_id();
305 table.stream_job_status = PbStreamJobStatus::Created as _;
306 table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
307 self.catalog.write().create_table(&table);
308 self.add_table_or_source_id(table.id, table.schema_id, table.database_id);
309 self.hummock_snapshot_manager
310 .add_table_for_test(TableId::new(table.id));
311 Ok(())
312 }
313
314 async fn replace_materialized_view(
315 &self,
316 mut table: PbTable,
317 _graph: StreamFragmentGraph,
318 ) -> Result<()> {
319 table.stream_job_status = PbStreamJobStatus::Created as _;
320 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
321 self.catalog.write().update_table(&table);
322 Ok(())
323 }
324
325 async fn create_view(&self, mut view: PbView, _dependencies: HashSet<ObjectId>) -> Result<()> {
326 view.id = self.gen_id();
327 self.catalog.write().create_view(&view);
328 self.add_table_or_source_id(view.id, view.schema_id, view.database_id);
329 Ok(())
330 }
331
332 async fn create_table(
333 &self,
334 source: Option<PbSource>,
335 mut table: PbTable,
336 graph: StreamFragmentGraph,
337 _job_type: PbTableJobType,
338 if_not_exists: bool,
339 _dependencies: HashSet<ObjectId>,
340 ) -> Result<()> {
341 if let Some(source) = source {
342 let source_id = self.create_source_inner(source)?;
343 table.optional_associated_source_id =
344 Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
345 }
346 self.create_materialized_view(table, graph, HashSet::new(), None, if_not_exists)
347 .await?;
348 Ok(())
349 }
350
351 async fn replace_table(
352 &self,
353 _source: Option<PbSource>,
354 mut table: PbTable,
355 _graph: StreamFragmentGraph,
356 _job_type: TableJobType,
357 ) -> Result<()> {
358 table.stream_job_status = PbStreamJobStatus::Created as _;
359 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
360 self.catalog.write().update_table(&table);
361 Ok(())
362 }
363
364 async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
365 self.catalog.write().update_source(&source);
366 Ok(())
367 }
368
369 async fn create_source(
370 &self,
371 source: PbSource,
372 _graph: Option<StreamFragmentGraph>,
373 _if_not_exists: bool,
374 ) -> Result<()> {
375 self.create_source_inner(source).map(|_| ())
376 }
377
378 async fn create_sink(
379 &self,
380 sink: PbSink,
381 graph: StreamFragmentGraph,
382 _dependencies: HashSet<ObjectId>,
383 _if_not_exists: bool,
384 ) -> Result<()> {
385 self.create_sink_inner(sink, graph)
386 }
387
388 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
389 self.create_subscription_inner(subscription)
390 }
391
392 async fn create_index(
393 &self,
394 mut index: PbIndex,
395 mut index_table: PbTable,
396 _graph: StreamFragmentGraph,
397 _if_not_exists: bool,
398 ) -> Result<()> {
399 index_table.id = self.gen_id();
400 index_table.stream_job_status = PbStreamJobStatus::Created as _;
401 index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
402 self.catalog.write().create_table(&index_table);
403 self.add_table_or_index_id(
404 index_table.id,
405 index_table.schema_id,
406 index_table.database_id,
407 );
408
409 index.id = index_table.id;
410 index.index_table_id = index_table.id;
411 self.catalog.write().create_index(&index);
412 Ok(())
413 }
414
415 async fn create_function(&self, _function: PbFunction) -> Result<()> {
416 unreachable!()
417 }
418
419 async fn create_connection(
420 &self,
421 _connection_name: String,
422 _database_id: u32,
423 _schema_id: u32,
424 _owner_id: u32,
425 _connection: create_connection_request::Payload,
426 ) -> Result<()> {
427 unreachable!()
428 }
429
430 async fn create_secret(
431 &self,
432 _secret_name: String,
433 _database_id: u32,
434 _schema_id: u32,
435 _owner_id: u32,
436 _payload: Vec<u8>,
437 ) -> Result<()> {
438 unreachable!()
439 }
440
441 async fn comment_on(&self, _comment: PbComment) -> Result<()> {
442 unreachable!()
443 }
444
445 async fn drop_table(
446 &self,
447 source_id: Option<u32>,
448 table_id: TableId,
449 cascade: bool,
450 ) -> Result<()> {
451 if cascade {
452 return Err(ErrorCode::NotSupported(
453 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
454 "use drop instead".to_owned(),
455 )
456 .into());
457 }
458 if let Some(source_id) = source_id {
459 self.drop_table_or_source_id(source_id);
460 }
461 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
462 let indexes =
463 self.catalog
464 .read()
465 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
466 for index in indexes {
467 self.drop_index(index.id, cascade).await?;
468 }
469 self.catalog
470 .write()
471 .drop_table(database_id, schema_id, table_id);
472 if let Some(source_id) = source_id {
473 self.catalog
474 .write()
475 .drop_source(database_id, schema_id, source_id);
476 }
477 Ok(())
478 }
479
480 async fn drop_view(&self, _view_id: u32, _cascade: bool) -> Result<()> {
481 unreachable!()
482 }
483
484 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
485 if cascade {
486 return Err(ErrorCode::NotSupported(
487 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
488 "use drop instead".to_owned(),
489 )
490 .into());
491 }
492 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.table_id);
493 let indexes =
494 self.catalog
495 .read()
496 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
497 for index in indexes {
498 self.drop_index(index.id, cascade).await?;
499 }
500 self.catalog
501 .write()
502 .drop_table(database_id, schema_id, table_id);
503 Ok(())
504 }
505
506 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
507 if cascade {
508 return Err(ErrorCode::NotSupported(
509 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
510 "use drop instead".to_owned(),
511 )
512 .into());
513 }
514 let (database_id, schema_id) = self.drop_table_or_source_id(source_id);
515 self.catalog
516 .write()
517 .drop_source(database_id, schema_id, source_id);
518 Ok(())
519 }
520
521 async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<()> {
522 if cascade {
523 return Err(ErrorCode::NotSupported(
524 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
525 "use drop instead".to_owned(),
526 )
527 .into());
528 }
529 let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id);
530 self.catalog
531 .write()
532 .drop_sink(database_id, schema_id, sink_id);
533 Ok(())
534 }
535
536 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
537 if cascade {
538 return Err(ErrorCode::NotSupported(
539 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
540 "use drop instead".to_owned(),
541 )
542 .into());
543 }
544 let (database_id, schema_id) = self.drop_table_or_subscription_id(subscription_id);
545 self.catalog
546 .write()
547 .drop_subscription(database_id, schema_id, subscription_id);
548 Ok(())
549 }
550
551 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
552 if cascade {
553 return Err(ErrorCode::NotSupported(
554 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
555 "use drop instead".to_owned(),
556 )
557 .into());
558 }
559 let &schema_id = self
560 .table_id_to_schema_id
561 .read()
562 .get(&index_id.index_id)
563 .unwrap();
564 let database_id = self.get_database_id_by_schema(schema_id);
565
566 let index = {
567 let catalog_reader = self.catalog.read();
568 let schema_catalog = catalog_reader
569 .get_schema_by_id(&database_id, &schema_id)
570 .unwrap();
571 schema_catalog.get_index_by_id(&index_id).unwrap().clone()
572 };
573
574 let index_table_id = index.index_table().id;
575 let (database_id, schema_id) = self.drop_table_or_index_id(index_id.index_id);
576 self.catalog
577 .write()
578 .drop_index(database_id, schema_id, index_id);
579 self.catalog
580 .write()
581 .drop_table(database_id, schema_id, index_table_id);
582 Ok(())
583 }
584
585 async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
586 unreachable!()
587 }
588
589 async fn drop_connection(&self, _connection_id: u32, _cascade: bool) -> Result<()> {
590 unreachable!()
591 }
592
593 async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
594 unreachable!()
595 }
596
597 async fn drop_database(&self, database_id: u32) -> Result<()> {
598 self.catalog.write().drop_database(database_id);
599 Ok(())
600 }
601
602 async fn drop_schema(&self, schema_id: u32, _cascade: bool) -> Result<()> {
603 let database_id = self.drop_schema_id(schema_id);
604 self.catalog.write().drop_schema(database_id, schema_id);
605 Ok(())
606 }
607
608 async fn alter_name(
609 &self,
610 object_id: alter_name_request::Object,
611 object_name: &str,
612 ) -> Result<()> {
613 match object_id {
614 alter_name_request::Object::TableId(table_id) => {
615 self.catalog
616 .write()
617 .alter_table_name_by_id(&table_id.into(), object_name);
618 Ok(())
619 }
620 _ => {
621 unimplemented!()
622 }
623 }
624 }
625
626 async fn alter_source(&self, source: PbSource) -> Result<()> {
627 self.catalog.write().update_source(&source);
628 Ok(())
629 }
630
631 async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
632 for database in self.catalog.read().iter_databases() {
633 for schema in database.iter_schemas() {
634 match object {
635 Object::TableId(table_id) => {
636 if let Some(table) =
637 schema.get_created_table_by_id(&TableId::from(table_id))
638 {
639 let mut pb_table = table.to_prost();
640 pb_table.owner = owner_id;
641 self.catalog.write().update_table(&pb_table);
642 return Ok(());
643 }
644 }
645 _ => unreachable!(),
646 }
647 }
648 }
649
650 Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
651 }
652
653 async fn alter_set_schema(
654 &self,
655 object: alter_set_schema_request::Object,
656 new_schema_id: u32,
657 ) -> Result<()> {
658 match object {
659 alter_set_schema_request::Object::TableId(table_id) => {
660 let mut pb_table = {
661 let reader = self.catalog.read();
662 let table = reader.get_any_table_by_id(&table_id.into())?.to_owned();
663 table.to_prost()
664 };
665 pb_table.schema_id = new_schema_id;
666 self.catalog.write().update_table(&pb_table);
667 self.table_id_to_schema_id
668 .write()
669 .insert(table_id, new_schema_id);
670 Ok(())
671 }
672 _ => unreachable!(),
673 }
674 }
675
676 async fn alter_parallelism(
677 &self,
678 _table_id: u32,
679 _parallelism: PbTableParallelism,
680 _deferred: bool,
681 ) -> Result<()> {
682 todo!()
683 }
684
685 async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
686 todo!()
687 }
688
689 async fn alter_secret(
690 &self,
691 _secret_id: u32,
692 _secret_name: String,
693 _database_id: u32,
694 _schema_id: u32,
695 _owner_id: u32,
696 _payload: Vec<u8>,
697 ) -> Result<()> {
698 unreachable!()
699 }
700
701 async fn alter_resource_group(
702 &self,
703 _table_id: u32,
704 _resource_group: Option<String>,
705 _deferred: bool,
706 ) -> Result<()> {
707 todo!()
708 }
709
710 async fn alter_database_param(
711 &self,
712 database_id: u32,
713 param: AlterDatabaseParam,
714 ) -> Result<()> {
715 let mut pb_database = {
716 let reader = self.catalog.read();
717 let database = reader.get_database_by_id(&database_id)?.to_owned();
718 database.to_prost()
719 };
720 match param {
721 AlterDatabaseParam::BarrierIntervalMs(interval) => {
722 pb_database.barrier_interval_ms = interval;
723 }
724 AlterDatabaseParam::CheckpointFrequency(frequency) => {
725 pb_database.checkpoint_frequency = frequency;
726 }
727 }
728 self.catalog.write().update_database(&pb_database);
729 Ok(())
730 }
731
732 async fn create_iceberg_table(
733 &self,
734 _table_job_info: PbTableJobInfo,
735 _sink_job_info: PbSinkJobInfo,
736 _iceberg_source: PbSource,
737 _if_not_exists: bool,
738 ) -> Result<()> {
739 todo!()
740 }
741}
742
743impl MockCatalogWriter {
744 pub fn new(
745 catalog: Arc<RwLock<Catalog>>,
746 hummock_snapshot_manager: HummockSnapshotManagerRef,
747 ) -> Self {
748 catalog.write().create_database(&PbDatabase {
749 id: 0,
750 name: DEFAULT_DATABASE_NAME.to_owned(),
751 owner: DEFAULT_SUPER_USER_ID,
752 resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
753 barrier_interval_ms: None,
754 checkpoint_frequency: None,
755 });
756 catalog.write().create_schema(&PbSchema {
757 id: 1,
758 name: DEFAULT_SCHEMA_NAME.to_owned(),
759 database_id: 0,
760 owner: DEFAULT_SUPER_USER_ID,
761 });
762 catalog.write().create_schema(&PbSchema {
763 id: 2,
764 name: PG_CATALOG_SCHEMA_NAME.to_owned(),
765 database_id: 0,
766 owner: DEFAULT_SUPER_USER_ID,
767 });
768 catalog.write().create_schema(&PbSchema {
769 id: 3,
770 name: RW_CATALOG_SCHEMA_NAME.to_owned(),
771 database_id: 0,
772 owner: DEFAULT_SUPER_USER_ID,
773 });
774 let mut map: HashMap<u32, DatabaseId> = HashMap::new();
775 map.insert(1_u32, 0_u32);
776 map.insert(2_u32, 0_u32);
777 map.insert(3_u32, 0_u32);
778 Self {
779 catalog,
780 id: AtomicU32::new(3),
781 table_id_to_schema_id: Default::default(),
782 schema_id_to_database_id: RwLock::new(map),
783 hummock_snapshot_manager,
784 }
785 }
786
787 fn gen_id(&self) -> u32 {
788 self.id.fetch_add(1, Ordering::SeqCst) + 1
790 }
791
792 fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
793 self.table_id_to_schema_id
794 .write()
795 .insert(table_id, schema_id);
796 }
797
798 fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
799 let schema_id = self
800 .table_id_to_schema_id
801 .write()
802 .remove(&table_id)
803 .unwrap();
804 (self.get_database_id_by_schema(schema_id), schema_id)
805 }
806
807 fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
808 self.table_id_to_schema_id
809 .write()
810 .insert(table_id, schema_id);
811 }
812
813 fn add_table_or_subscription_id(
814 &self,
815 table_id: u32,
816 schema_id: SchemaId,
817 _database_id: DatabaseId,
818 ) {
819 self.table_id_to_schema_id
820 .write()
821 .insert(table_id, schema_id);
822 }
823
824 fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
825 self.table_id_to_schema_id
826 .write()
827 .insert(table_id, schema_id);
828 }
829
830 fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
831 let schema_id = self
832 .table_id_to_schema_id
833 .write()
834 .remove(&table_id)
835 .unwrap();
836 (self.get_database_id_by_schema(schema_id), schema_id)
837 }
838
839 fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
840 let schema_id = self
841 .table_id_to_schema_id
842 .write()
843 .remove(&table_id)
844 .unwrap();
845 (self.get_database_id_by_schema(schema_id), schema_id)
846 }
847
848 fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
849 let schema_id = self
850 .table_id_to_schema_id
851 .write()
852 .remove(&table_id)
853 .unwrap();
854 (self.get_database_id_by_schema(schema_id), schema_id)
855 }
856
857 fn add_schema_id(&self, schema_id: u32, database_id: DatabaseId) {
858 self.schema_id_to_database_id
859 .write()
860 .insert(schema_id, database_id);
861 }
862
863 fn drop_schema_id(&self, schema_id: u32) -> DatabaseId {
864 self.schema_id_to_database_id
865 .write()
866 .remove(&schema_id)
867 .unwrap()
868 }
869
870 fn create_source_inner(&self, mut source: PbSource) -> Result<u32> {
871 source.id = self.gen_id();
872 self.catalog.write().create_source(&source);
873 self.add_table_or_source_id(source.id, source.schema_id, source.database_id);
874 Ok(source.id)
875 }
876
877 fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
878 sink.id = self.gen_id();
879 sink.stream_job_status = PbStreamJobStatus::Created as _;
880 self.catalog.write().create_sink(&sink);
881 self.add_table_or_sink_id(sink.id, sink.schema_id, sink.database_id);
882 Ok(())
883 }
884
885 fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
886 subscription.id = self.gen_id();
887 self.catalog.write().create_subscription(&subscription);
888 self.add_table_or_subscription_id(
889 subscription.id,
890 subscription.schema_id,
891 subscription.database_id,
892 );
893 Ok(())
894 }
895
896 fn get_database_id_by_schema(&self, schema_id: u32) -> DatabaseId {
897 *self
898 .schema_id_to_database_id
899 .read()
900 .get(&schema_id)
901 .unwrap()
902 }
903}
904
905pub struct MockUserInfoWriter {
906 id: AtomicU32,
907 user_info: Arc<RwLock<UserInfoManager>>,
908}
909
910#[async_trait::async_trait]
911impl UserInfoWriter for MockUserInfoWriter {
912 async fn create_user(&self, user: UserInfo) -> Result<()> {
913 let mut user = user;
914 user.id = self.gen_id();
915 self.user_info.write().create_user(user);
916 Ok(())
917 }
918
919 async fn drop_user(&self, id: UserId) -> Result<()> {
920 self.user_info.write().drop_user(id);
921 Ok(())
922 }
923
924 async fn update_user(
925 &self,
926 update_user: UserInfo,
927 update_fields: Vec<UpdateField>,
928 ) -> Result<()> {
929 let mut lock = self.user_info.write();
930 let id = update_user.get_id();
931 let Some(old_name) = lock.get_user_name_by_id(id) else {
932 return Ok(());
933 };
934 let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
935 update_fields.into_iter().for_each(|field| match field {
936 UpdateField::Super => user_info.is_super = update_user.is_super,
937 UpdateField::Login => user_info.can_login = update_user.can_login,
938 UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
939 UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
940 UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
941 UpdateField::Rename => user_info.name.clone_from(&update_user.name),
942 UpdateField::Admin => user_info.is_admin = update_user.is_admin,
943 UpdateField::Unspecified => unreachable!(),
944 });
945 lock.update_user(update_user);
946 Ok(())
947 }
948
949 async fn grant_privilege(
952 &self,
953 users: Vec<UserId>,
954 privileges: Vec<GrantPrivilege>,
955 with_grant_option: bool,
956 _grantor: UserId,
957 ) -> Result<()> {
958 let privileges = privileges
959 .into_iter()
960 .map(|mut p| {
961 p.action_with_opts
962 .iter_mut()
963 .for_each(|ao| ao.with_grant_option = with_grant_option);
964 p
965 })
966 .collect::<Vec<_>>();
967 for user_id in users {
968 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
969 u.extend_privileges(privileges.clone());
970 }
971 }
972 Ok(())
973 }
974
975 async fn revoke_privilege(
978 &self,
979 users: Vec<UserId>,
980 privileges: Vec<GrantPrivilege>,
981 _granted_by: UserId,
982 _revoke_by: UserId,
983 revoke_grant_option: bool,
984 _cascade: bool,
985 ) -> Result<()> {
986 for user_id in users {
987 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
988 u.revoke_privileges(privileges.clone(), revoke_grant_option);
989 }
990 }
991 Ok(())
992 }
993
994 async fn alter_default_privilege(
995 &self,
996 _users: Vec<UserId>,
997 _database_id: DatabaseId,
998 _schemas: Vec<SchemaId>,
999 _operation: AlterDefaultPrivilegeOperation,
1000 _operated_by: UserId,
1001 ) -> Result<()> {
1002 todo!()
1003 }
1004}
1005
1006impl MockUserInfoWriter {
1007 pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1008 user_info.write().create_user(UserInfo {
1009 id: DEFAULT_SUPER_USER_ID,
1010 name: DEFAULT_SUPER_USER.to_owned(),
1011 is_super: true,
1012 can_create_db: true,
1013 can_create_user: true,
1014 can_login: true,
1015 ..Default::default()
1016 });
1017 user_info.write().create_user(UserInfo {
1018 id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1019 name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1020 is_super: true,
1021 can_create_db: true,
1022 can_create_user: true,
1023 can_login: true,
1024 is_admin: true,
1025 ..Default::default()
1026 });
1027 Self {
1028 user_info,
1029 id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
1030 }
1031 }
1032
1033 fn gen_id(&self) -> u32 {
1034 self.id.fetch_add(1, Ordering::SeqCst)
1035 }
1036}
1037
1038pub struct MockFrontendMetaClient {}
1039
1040#[async_trait::async_trait]
1041impl FrontendMetaClient for MockFrontendMetaClient {
1042 async fn try_unregister(&self) {}
1043
1044 async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1045 Ok(INVALID_VERSION_ID)
1046 }
1047
1048 async fn wait(&self) -> RpcResult<()> {
1049 Ok(())
1050 }
1051
1052 async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1053 Ok(vec![])
1054 }
1055
1056 async fn list_table_fragments(
1057 &self,
1058 _table_ids: &[u32],
1059 ) -> RpcResult<HashMap<u32, TableFragmentInfo>> {
1060 Ok(HashMap::default())
1061 }
1062
1063 async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1064 Ok(vec![])
1065 }
1066
1067 async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1068 Ok(vec![])
1069 }
1070
1071 async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1072 Ok(vec![])
1073 }
1074
1075 async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1076 Ok(vec![])
1077 }
1078
1079 async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1080 Ok(vec![])
1081 }
1082
1083 async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1084 Ok(vec![])
1085 }
1086
1087 async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1088 Ok(vec![])
1089 }
1090
1091 async fn set_system_param(
1092 &self,
1093 _param: String,
1094 _value: Option<String>,
1095 ) -> RpcResult<Option<SystemParamsReader>> {
1096 Ok(Some(SystemParams::default().into()))
1097 }
1098
1099 async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1100 Ok(Default::default())
1101 }
1102
1103 async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1104 Ok("".to_owned())
1105 }
1106
1107 async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1108 Ok(vec![])
1109 }
1110
1111 async fn get_tables(
1112 &self,
1113 _table_ids: &[u32],
1114 _include_dropped_tables: bool,
1115 ) -> RpcResult<HashMap<u32, Table>> {
1116 Ok(HashMap::new())
1117 }
1118
1119 async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(u32, u64)>> {
1120 unimplemented!()
1121 }
1122
1123 async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1124 Ok(HummockVersion::default())
1125 }
1126
1127 async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1128 unimplemented!()
1129 }
1130
1131 async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1132 unimplemented!()
1133 }
1134
1135 async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1136 unimplemented!()
1137 }
1138
1139 async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1140 unimplemented!()
1141 }
1142
1143 async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1144 unimplemented!()
1145 }
1146
1147 async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1148 unimplemented!()
1149 }
1150
1151 async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1152 unimplemented!()
1153 }
1154
1155 async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1156 unimplemented!()
1157 }
1158
1159 async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1160 Ok(vec![])
1161 }
1162
1163 async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1164 unimplemented!()
1165 }
1166
1167 async fn recover(&self) -> RpcResult<()> {
1168 unimplemented!()
1169 }
1170
1171 async fn apply_throttle(
1172 &self,
1173 _kind: PbThrottleTarget,
1174 _id: u32,
1175 _rate_limit: Option<u32>,
1176 ) -> RpcResult<()> {
1177 unimplemented!()
1178 }
1179
1180 async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1181 Ok(RecoveryStatus::StatusRunning)
1182 }
1183
1184 async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1185 Ok(vec![])
1186 }
1187
1188 async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1189 Ok(vec![])
1190 }
1191
1192 async fn list_cdc_progress(&self) -> RpcResult<HashMap<u32, PbCdcProgress>> {
1193 Ok(HashMap::default())
1194 }
1195
1196 async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1197 unimplemented!()
1198 }
1199
1200 async fn alter_sink_props(
1201 &self,
1202 _sink_id: u32,
1203 _changed_props: BTreeMap<String, String>,
1204 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1205 _connector_conn_ref: Option<u32>,
1206 ) -> RpcResult<()> {
1207 unimplemented!()
1208 }
1209
1210 async fn alter_iceberg_table_props(
1211 &self,
1212 _table_id: u32,
1213 _sink_id: u32,
1214 _source_id: u32,
1215 _changed_props: BTreeMap<String, String>,
1216 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1217 _connector_conn_ref: Option<u32>,
1218 ) -> RpcResult<()> {
1219 unimplemented!()
1220 }
1221
1222 async fn alter_source_connector_props(
1223 &self,
1224 _source_id: u32,
1225 _changed_props: BTreeMap<String, String>,
1226 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1227 _connector_conn_ref: Option<u32>,
1228 ) -> RpcResult<()> {
1229 unimplemented!()
1230 }
1231
1232 async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1233 unimplemented!()
1234 }
1235
1236 async fn get_fragment_by_id(
1237 &self,
1238 _fragment_id: u32,
1239 ) -> RpcResult<Option<FragmentDistribution>> {
1240 unimplemented!()
1241 }
1242
1243 fn worker_id(&self) -> u32 {
1244 0
1245 }
1246
1247 async fn set_sync_log_store_aligned(&self, _job_id: u32, _aligned: bool) -> RpcResult<()> {
1248 Ok(())
1249 }
1250
1251 async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1252 Ok(1)
1253 }
1254
1255 async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1256 Ok(())
1257 }
1258
1259 async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1260 Ok(RefreshResponse { status: None })
1261 }
1262}
1263
1264#[cfg(test)]
1265pub static PROTO_FILE_DATA: &str = r#"
1266 syntax = "proto3";
1267 package test;
1268 message TestRecord {
1269 int32 id = 1;
1270 Country country = 3;
1271 int64 zipcode = 4;
1272 float rate = 5;
1273 }
1274 message TestRecordAlterType {
1275 string id = 1;
1276 Country country = 3;
1277 int32 zipcode = 4;
1278 float rate = 5;
1279 }
1280 message TestRecordExt {
1281 int32 id = 1;
1282 Country country = 3;
1283 int64 zipcode = 4;
1284 float rate = 5;
1285 string name = 6;
1286 }
1287 message Country {
1288 string address = 1;
1289 City city = 2;
1290 string zipcode = 3;
1291 }
1292 message City {
1293 string address = 1;
1294 string zipcode = 2;
1295 }"#;
1296
1297pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1300 let in_file = Builder::new()
1301 .prefix("temp")
1302 .suffix(".proto")
1303 .rand_bytes(8)
1304 .tempfile()
1305 .unwrap();
1306
1307 let out_file = Builder::new()
1308 .prefix("temp")
1309 .suffix(".pb")
1310 .rand_bytes(8)
1311 .tempfile()
1312 .unwrap();
1313
1314 let mut file = in_file.as_file();
1315 file.write_all(proto_data.as_ref())
1316 .expect("writing binary to test file");
1317 file.flush().expect("flush temp file failed");
1318 let include_path = in_file
1319 .path()
1320 .parent()
1321 .unwrap()
1322 .to_string_lossy()
1323 .into_owned();
1324 let out_path = out_file.path().to_string_lossy().into_owned();
1325 let in_path = in_file.path().to_string_lossy().into_owned();
1326 let mut compile = std::process::Command::new("protoc");
1327
1328 let out = compile
1329 .arg("--include_imports")
1330 .arg("-I")
1331 .arg(include_path)
1332 .arg(format!("--descriptor_set_out={}", out_path))
1333 .arg(in_path)
1334 .output()
1335 .expect("failed to compile proto");
1336 if !out.status.success() {
1337 panic!("compile proto failed \n output: {:?}", out);
1338 }
1339 out_file
1340}