1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28 AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29 DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30 FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31 RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::util::cluster_limit::ClusterLimit;
38use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
39use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
40use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
41use risingwave_pb::backup_service::MetaSnapshotMetadata;
42use risingwave_pb::catalog::{
43 PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44 PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
49use risingwave_pb::ddl_service::{
50 DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
51 alter_swap_rename_request, create_connection_request, streaming_job_resource_type,
52};
53use risingwave_pb::hummock::write_limits::WriteLimit;
54use risingwave_pb::hummock::{
55 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
56};
57use risingwave_pb::id::ActorId;
58use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
59use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
60use risingwave_pb::meta::list_actor_states_response::ActorState;
61use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
62use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
63use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
64use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
65use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
66use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
67use risingwave_pb::meta::{
68 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
69 RefreshRequest, RefreshResponse, SystemParams, list_sink_log_store_tables_response,
70};
71use risingwave_pb::secret::PbSecretRef;
72use risingwave_pb::stream_plan::StreamFragmentGraph;
73use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
74use risingwave_pb::user::update_user_request::UpdateField;
75use risingwave_pb::user::{GrantPrivilege, UserInfo};
76use risingwave_rpc_client::error::Result as RpcResult;
77use tempfile::{Builder, NamedTempFile};
78
79use crate::FrontendOpts;
80use crate::catalog::catalog_service::CatalogWriter;
81use crate::catalog::root_catalog::Catalog;
82use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
83use crate::error::{ErrorCode, Result, RwError};
84use crate::handler::RwPgResponse;
85use crate::meta_client::FrontendMetaClient;
86use crate::scheduler::HummockSnapshotManagerRef;
87use crate::session::{AuthContext, FrontendEnv, SessionImpl};
88use crate::user::UserId;
89use crate::user::user_manager::UserInfoManager;
90use crate::user::user_service::UserInfoWriter;
91
92pub struct LocalFrontend {
94 pub opts: FrontendOpts,
95 env: FrontendEnv,
96}
97
98impl SessionManager for LocalFrontend {
99 type Error = RwError;
100 type Session = SessionImpl;
101
102 fn create_dummy_session(
103 &self,
104 _database_id: DatabaseId,
105 ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
106 unreachable!()
107 }
108
109 fn connect(
110 &self,
111 _database: &str,
112 _user_name: &str,
113 _peer_addr: AddressRef,
114 ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
115 Ok(self.session_ref())
116 }
117
118 fn cancel_queries_in_session(&self, _session_id: SessionId) {
119 unreachable!()
120 }
121
122 fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
123 unreachable!()
124 }
125
126 fn end_session(&self, _session: &Self::Session) {
127 unreachable!()
128 }
129}
130
131impl LocalFrontend {
132 #[expect(clippy::unused_async)]
133 pub async fn new(opts: FrontendOpts) -> Self {
134 let env = FrontendEnv::mock();
135 Self { opts, env }
136 }
137
138 pub async fn run_sql(
139 &self,
140 sql: impl Into<String>,
141 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
142 let sql: Arc<str> = Arc::from(sql.into());
143 self.session_ref().run_statement(sql, vec![]).await
144 }
145
146 pub async fn run_sql_with_session(
147 &self,
148 session_ref: Arc<SessionImpl>,
149 sql: impl Into<String>,
150 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
151 let sql: Arc<str> = Arc::from(sql.into());
152 session_ref.run_statement(sql, vec![]).await
153 }
154
155 pub async fn run_user_sql(
156 &self,
157 sql: impl Into<String>,
158 database: String,
159 user_name: String,
160 user_id: UserId,
161 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
162 let sql: Arc<str> = Arc::from(sql.into());
163 self.session_user_ref(database, user_name, user_id)
164 .run_statement(sql, vec![])
165 .await
166 }
167
168 pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
169 let mut rsp = self.run_sql(sql).await.unwrap();
170 let mut res = vec![];
171 #[for_await]
172 for row_set in rsp.values_stream() {
173 for row in row_set.unwrap() {
174 res.push(format!("{:?}", row));
175 }
176 }
177 res
178 }
179
180 pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
181 let mut rsp = self.run_sql(sql).await.unwrap();
182 assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
183 let mut res = String::new();
184 #[for_await]
185 for row_set in rsp.values_stream() {
186 for row in row_set.unwrap() {
187 let row: Row = row;
188 let row = row.values()[0].as_ref().unwrap();
189 res += std::str::from_utf8(row).unwrap();
190 res += "\n";
191 }
192 }
193 res
194 }
195
196 pub fn session_ref(&self) -> Arc<SessionImpl> {
198 self.session_user_ref(
199 DEFAULT_DATABASE_NAME.to_owned(),
200 DEFAULT_SUPER_USER.to_owned(),
201 DEFAULT_SUPER_USER_ID,
202 )
203 }
204
205 pub fn session_user_ref(
206 &self,
207 database: String,
208 user_name: String,
209 user_id: UserId,
210 ) -> Arc<SessionImpl> {
211 Arc::new(SessionImpl::new(
212 self.env.clone(),
213 AuthContext::new(database, user_name, user_id),
214 UserAuthenticator::None,
215 (0, 0),
217 Address::Tcp(SocketAddr::new(
218 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
219 6666,
220 ))
221 .into(),
222 Default::default(),
223 ))
224 }
225}
226
227pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
228 if rsp.stmt_type() != StatementType::EXPLAIN {
229 panic!("RESPONSE INVALID: {rsp:?}");
230 }
231 let mut res = String::new();
232 #[for_await]
233 for row_set in rsp.values_stream() {
234 for row in row_set.unwrap() {
235 let row: Row = row;
236 let row = row.values()[0].as_ref().unwrap();
237 res += std::str::from_utf8(row).unwrap();
238 res += "\n";
239 }
240 }
241 res
242}
243
244pub struct MockCatalogWriter {
245 catalog: Arc<RwLock<Catalog>>,
246 id: AtomicU32,
247 table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
248 schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
249 hummock_snapshot_manager: HummockSnapshotManagerRef,
250}
251
252#[async_trait::async_trait]
253impl CatalogWriter for MockCatalogWriter {
254 async fn create_database(
255 &self,
256 db_name: &str,
257 owner: UserId,
258 resource_group: &str,
259 barrier_interval_ms: Option<u32>,
260 checkpoint_frequency: Option<u64>,
261 ) -> Result<()> {
262 let database_id = DatabaseId::new(self.gen_id());
263 self.catalog.write().create_database(&PbDatabase {
264 name: db_name.to_owned(),
265 id: database_id,
266 owner,
267 resource_group: resource_group.to_owned(),
268 barrier_interval_ms,
269 checkpoint_frequency,
270 });
271 self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
272 .await?;
273 self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
274 .await?;
275 self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
276 .await?;
277 Ok(())
278 }
279
280 async fn create_schema(
281 &self,
282 db_id: DatabaseId,
283 schema_name: &str,
284 owner: UserId,
285 ) -> Result<()> {
286 let id = self.gen_id();
287 self.catalog.write().create_schema(&PbSchema {
288 id,
289 name: schema_name.to_owned(),
290 database_id: db_id,
291 owner,
292 });
293 self.add_schema_id(id, db_id);
294 Ok(())
295 }
296
297 async fn create_materialized_view(
298 &self,
299 mut table: PbTable,
300 _graph: StreamFragmentGraph,
301 _dependencies: HashSet<ObjectId>,
302 _resource_type: streaming_job_resource_type::ResourceType,
303 _if_not_exists: bool,
304 ) -> Result<()> {
305 table.id = self.gen_id();
306 table.stream_job_status = PbStreamJobStatus::Created as _;
307 table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
308 self.catalog.write().create_table(&table);
309 self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
310 self.hummock_snapshot_manager.add_table_for_test(table.id);
311 Ok(())
312 }
313
314 async fn replace_materialized_view(
315 &self,
316 mut table: PbTable,
317 _graph: StreamFragmentGraph,
318 ) -> Result<()> {
319 table.stream_job_status = PbStreamJobStatus::Created as _;
320 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
321 self.catalog.write().update_table(&table);
322 Ok(())
323 }
324
325 async fn create_view(&self, mut view: PbView, _dependencies: HashSet<ObjectId>) -> Result<()> {
326 view.id = self.gen_id();
327 self.catalog.write().create_view(&view);
328 self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
329 Ok(())
330 }
331
332 async fn create_table(
333 &self,
334 source: Option<PbSource>,
335 mut table: PbTable,
336 graph: StreamFragmentGraph,
337 _job_type: PbTableJobType,
338 if_not_exists: bool,
339 _dependencies: HashSet<ObjectId>,
340 ) -> Result<()> {
341 if let Some(source) = source {
342 let source_id = self.create_source_inner(source)?;
343 table.optional_associated_source_id = Some(source_id.into());
344 }
345 self.create_materialized_view(
346 table,
347 graph,
348 HashSet::new(),
349 streaming_job_resource_type::ResourceType::Regular(true),
350 if_not_exists,
351 )
352 .await?;
353 Ok(())
354 }
355
356 async fn replace_table(
357 &self,
358 _source: Option<PbSource>,
359 mut table: PbTable,
360 _graph: StreamFragmentGraph,
361 _job_type: TableJobType,
362 ) -> Result<()> {
363 table.stream_job_status = PbStreamJobStatus::Created as _;
364 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
365 self.catalog.write().update_table(&table);
366 Ok(())
367 }
368
369 async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
370 self.catalog.write().update_source(&source);
371 Ok(())
372 }
373
374 async fn create_source(
375 &self,
376 source: PbSource,
377 _graph: Option<StreamFragmentGraph>,
378 _if_not_exists: bool,
379 ) -> Result<()> {
380 self.create_source_inner(source).map(|_| ())
381 }
382
383 async fn create_sink(
384 &self,
385 sink: PbSink,
386 graph: StreamFragmentGraph,
387 _dependencies: HashSet<ObjectId>,
388 _if_not_exists: bool,
389 ) -> Result<()> {
390 self.create_sink_inner(sink, graph)
391 }
392
393 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
394 self.create_subscription_inner(subscription)
395 }
396
397 async fn create_index(
398 &self,
399 mut index: PbIndex,
400 mut index_table: PbTable,
401 _graph: StreamFragmentGraph,
402 _if_not_exists: bool,
403 ) -> Result<()> {
404 index_table.id = self.gen_id();
405 index_table.stream_job_status = PbStreamJobStatus::Created as _;
406 index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
407 self.catalog.write().create_table(&index_table);
408 self.add_table_or_index_id(
409 index_table.id.as_raw_id(),
410 index_table.schema_id,
411 index_table.database_id,
412 );
413
414 index.id = index_table.id.as_raw_id().into();
415 index.index_table_id = index_table.id;
416 self.catalog.write().create_index(&index);
417 Ok(())
418 }
419
420 async fn create_function(&self, _function: PbFunction) -> Result<()> {
421 unreachable!()
422 }
423
424 async fn create_connection(
425 &self,
426 _connection_name: String,
427 _database_id: DatabaseId,
428 _schema_id: SchemaId,
429 _owner_id: UserId,
430 _connection: create_connection_request::Payload,
431 ) -> Result<()> {
432 unreachable!()
433 }
434
435 async fn create_secret(
436 &self,
437 _secret_name: String,
438 _database_id: DatabaseId,
439 _schema_id: SchemaId,
440 _owner_id: UserId,
441 _payload: Vec<u8>,
442 ) -> Result<()> {
443 unreachable!()
444 }
445
446 async fn comment_on(&self, _comment: PbComment) -> Result<()> {
447 unreachable!()
448 }
449
450 async fn drop_table(
451 &self,
452 source_id: Option<SourceId>,
453 table_id: TableId,
454 cascade: bool,
455 ) -> Result<()> {
456 if cascade {
457 return Err(ErrorCode::NotSupported(
458 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
459 "use drop instead".to_owned(),
460 )
461 .into());
462 }
463 if let Some(source_id) = source_id {
464 self.drop_table_or_source_id(source_id.as_raw_id());
465 }
466 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
467 let indexes =
468 self.catalog
469 .read()
470 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
471 for index in indexes {
472 self.drop_index(index.id, cascade).await?;
473 }
474 self.catalog
475 .write()
476 .drop_table(database_id, schema_id, table_id);
477 if let Some(source_id) = source_id {
478 self.catalog
479 .write()
480 .drop_source(database_id, schema_id, source_id);
481 }
482 Ok(())
483 }
484
485 async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
486 unreachable!()
487 }
488
489 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
490 if cascade {
491 return Err(ErrorCode::NotSupported(
492 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
493 "use drop instead".to_owned(),
494 )
495 .into());
496 }
497 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
498 let indexes =
499 self.catalog
500 .read()
501 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
502 for index in indexes {
503 self.drop_index(index.id, cascade).await?;
504 }
505 self.catalog
506 .write()
507 .drop_table(database_id, schema_id, table_id);
508 Ok(())
509 }
510
511 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
512 if cascade {
513 return Err(ErrorCode::NotSupported(
514 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
515 "use drop instead".to_owned(),
516 )
517 .into());
518 }
519 let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
520 self.catalog
521 .write()
522 .drop_source(database_id, schema_id, source_id);
523 Ok(())
524 }
525
526 async fn reset_source(&self, _source_id: SourceId) -> Result<()> {
527 Ok(())
528 }
529
530 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
531 if cascade {
532 return Err(ErrorCode::NotSupported(
533 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
534 "use drop instead".to_owned(),
535 )
536 .into());
537 }
538 let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
539 self.catalog
540 .write()
541 .drop_sink(database_id, schema_id, sink_id);
542 Ok(())
543 }
544
545 async fn drop_subscription(
546 &self,
547 subscription_id: SubscriptionId,
548 cascade: bool,
549 ) -> Result<()> {
550 if cascade {
551 return Err(ErrorCode::NotSupported(
552 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
553 "use drop instead".to_owned(),
554 )
555 .into());
556 }
557 let (database_id, schema_id) =
558 self.drop_table_or_subscription_id(subscription_id.as_raw_id());
559 self.catalog
560 .write()
561 .drop_subscription(database_id, schema_id, subscription_id);
562 Ok(())
563 }
564
565 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
566 if cascade {
567 return Err(ErrorCode::NotSupported(
568 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
569 "use drop instead".to_owned(),
570 )
571 .into());
572 }
573 let &schema_id = self
574 .table_id_to_schema_id
575 .read()
576 .get(&index_id.as_raw_id())
577 .unwrap();
578 let database_id = self.get_database_id_by_schema(schema_id);
579
580 let index = {
581 let catalog_reader = self.catalog.read();
582 let schema_catalog = catalog_reader
583 .get_schema_by_id(database_id, schema_id)
584 .unwrap();
585 schema_catalog.get_index_by_id(index_id).unwrap().clone()
586 };
587
588 let index_table_id = index.index_table().id;
589 let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
590 self.catalog
591 .write()
592 .drop_index(database_id, schema_id, index_id);
593 self.catalog
594 .write()
595 .drop_table(database_id, schema_id, index_table_id);
596 Ok(())
597 }
598
599 async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
600 unreachable!()
601 }
602
603 async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
604 unreachable!()
605 }
606
607 async fn drop_secret(&self, _secret_id: SecretId, _cascade: bool) -> Result<()> {
608 unreachable!()
609 }
610
611 async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
612 self.catalog.write().drop_database(database_id);
613 Ok(())
614 }
615
616 async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
617 let database_id = self.drop_schema_id(schema_id);
618 self.catalog.write().drop_schema(database_id, schema_id);
619 Ok(())
620 }
621
622 async fn alter_name(
623 &self,
624 object_id: alter_name_request::Object,
625 object_name: &str,
626 ) -> Result<()> {
627 match object_id {
628 alter_name_request::Object::TableId(table_id) => {
629 self.catalog
630 .write()
631 .alter_table_name_by_id(table_id, object_name);
632 Ok(())
633 }
634 _ => {
635 unimplemented!()
636 }
637 }
638 }
639
640 async fn alter_source(&self, source: PbSource) -> Result<()> {
641 self.catalog.write().update_source(&source);
642 Ok(())
643 }
644
645 async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<()> {
646 for database in self.catalog.read().iter_databases() {
647 for schema in database.iter_schemas() {
648 match object {
649 Object::TableId(table_id) => {
650 if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
651 {
652 let mut pb_table = table.to_prost();
653 pb_table.owner = owner_id;
654 self.catalog.write().update_table(&pb_table);
655 return Ok(());
656 }
657 }
658 _ => unreachable!(),
659 }
660 }
661 }
662
663 Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
664 }
665
666 async fn alter_set_schema(
667 &self,
668 object: alter_set_schema_request::Object,
669 new_schema_id: SchemaId,
670 ) -> Result<()> {
671 match object {
672 alter_set_schema_request::Object::TableId(table_id) => {
673 let mut pb_table = {
674 let reader = self.catalog.read();
675 let table = reader.get_any_table_by_id(table_id)?.to_owned();
676 table.to_prost()
677 };
678 pb_table.schema_id = new_schema_id;
679 self.catalog.write().update_table(&pb_table);
680 self.table_id_to_schema_id
681 .write()
682 .insert(table_id.as_raw_id(), new_schema_id);
683 Ok(())
684 }
685 _ => unreachable!(),
686 }
687 }
688
689 async fn alter_parallelism(
690 &self,
691 _job_id: JobId,
692 _parallelism: PbTableParallelism,
693 _deferred: bool,
694 ) -> Result<()> {
695 todo!()
696 }
697
698 async fn alter_config(
699 &self,
700 _job_id: JobId,
701 _entries_to_add: HashMap<String, String>,
702 _keys_to_remove: Vec<String>,
703 ) -> Result<()> {
704 todo!()
705 }
706
707 async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
708 todo!()
709 }
710
711 async fn alter_secret(
712 &self,
713 _secret_id: SecretId,
714 _secret_name: String,
715 _database_id: DatabaseId,
716 _schema_id: SchemaId,
717 _owner_id: UserId,
718 _payload: Vec<u8>,
719 ) -> Result<()> {
720 unreachable!()
721 }
722
723 async fn alter_resource_group(
724 &self,
725 _table_id: TableId,
726 _resource_group: Option<String>,
727 _deferred: bool,
728 ) -> Result<()> {
729 todo!()
730 }
731
732 async fn alter_database_param(
733 &self,
734 database_id: DatabaseId,
735 param: AlterDatabaseParam,
736 ) -> Result<()> {
737 let mut pb_database = {
738 let reader = self.catalog.read();
739 let database = reader.get_database_by_id(database_id)?.to_owned();
740 database.to_prost()
741 };
742 match param {
743 AlterDatabaseParam::BarrierIntervalMs(interval) => {
744 pb_database.barrier_interval_ms = interval;
745 }
746 AlterDatabaseParam::CheckpointFrequency(frequency) => {
747 pb_database.checkpoint_frequency = frequency;
748 }
749 }
750 self.catalog.write().update_database(&pb_database);
751 Ok(())
752 }
753
754 async fn create_iceberg_table(
755 &self,
756 _table_job_info: PbTableJobInfo,
757 _sink_job_info: PbSinkJobInfo,
758 _iceberg_source: PbSource,
759 _if_not_exists: bool,
760 ) -> Result<()> {
761 todo!()
762 }
763
764 async fn wait(&self) -> Result<()> {
765 Ok(())
766 }
767}
768
769impl MockCatalogWriter {
770 pub fn new(
771 catalog: Arc<RwLock<Catalog>>,
772 hummock_snapshot_manager: HummockSnapshotManagerRef,
773 ) -> Self {
774 catalog.write().create_database(&PbDatabase {
775 id: 0.into(),
776 name: DEFAULT_DATABASE_NAME.to_owned(),
777 owner: DEFAULT_SUPER_USER_ID,
778 resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
779 barrier_interval_ms: None,
780 checkpoint_frequency: None,
781 });
782 catalog.write().create_schema(&PbSchema {
783 id: 1.into(),
784 name: DEFAULT_SCHEMA_NAME.to_owned(),
785 database_id: 0.into(),
786 owner: DEFAULT_SUPER_USER_ID,
787 });
788 catalog.write().create_schema(&PbSchema {
789 id: 2.into(),
790 name: PG_CATALOG_SCHEMA_NAME.to_owned(),
791 database_id: 0.into(),
792 owner: DEFAULT_SUPER_USER_ID,
793 });
794 catalog.write().create_schema(&PbSchema {
795 id: 3.into(),
796 name: RW_CATALOG_SCHEMA_NAME.to_owned(),
797 database_id: 0.into(),
798 owner: DEFAULT_SUPER_USER_ID,
799 });
800 let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
801 map.insert(1_u32.into(), 0_u32.into());
802 map.insert(2_u32.into(), 0_u32.into());
803 map.insert(3_u32.into(), 0_u32.into());
804 Self {
805 catalog,
806 id: AtomicU32::new(3),
807 table_id_to_schema_id: Default::default(),
808 schema_id_to_database_id: RwLock::new(map),
809 hummock_snapshot_manager,
810 }
811 }
812
813 fn gen_id<T: From<u32>>(&self) -> T {
814 (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
816 }
817
818 fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
819 self.table_id_to_schema_id
820 .write()
821 .insert(table_id, schema_id);
822 }
823
824 fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
825 let schema_id = self
826 .table_id_to_schema_id
827 .write()
828 .remove(&table_id)
829 .unwrap();
830 (self.get_database_id_by_schema(schema_id), schema_id)
831 }
832
833 fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
834 self.table_id_to_schema_id
835 .write()
836 .insert(table_id, schema_id);
837 }
838
839 fn add_table_or_subscription_id(
840 &self,
841 table_id: u32,
842 schema_id: SchemaId,
843 _database_id: DatabaseId,
844 ) {
845 self.table_id_to_schema_id
846 .write()
847 .insert(table_id, schema_id);
848 }
849
850 fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
851 self.table_id_to_schema_id
852 .write()
853 .insert(table_id, schema_id);
854 }
855
856 fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
857 let schema_id = self
858 .table_id_to_schema_id
859 .write()
860 .remove(&table_id)
861 .unwrap();
862 (self.get_database_id_by_schema(schema_id), schema_id)
863 }
864
865 fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
866 let schema_id = self
867 .table_id_to_schema_id
868 .write()
869 .remove(&table_id)
870 .unwrap();
871 (self.get_database_id_by_schema(schema_id), schema_id)
872 }
873
874 fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
875 let schema_id = self
876 .table_id_to_schema_id
877 .write()
878 .remove(&table_id)
879 .unwrap();
880 (self.get_database_id_by_schema(schema_id), schema_id)
881 }
882
883 fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
884 self.schema_id_to_database_id
885 .write()
886 .insert(schema_id, database_id);
887 }
888
889 fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
890 self.schema_id_to_database_id
891 .write()
892 .remove(&schema_id)
893 .unwrap()
894 }
895
896 fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
897 source.id = self.gen_id();
898 self.catalog.write().create_source(&source);
899 self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
900 Ok(source.id)
901 }
902
903 fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
904 sink.id = self.gen_id();
905 sink.stream_job_status = PbStreamJobStatus::Created as _;
906 self.catalog.write().create_sink(&sink);
907 self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
908 Ok(())
909 }
910
911 fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
912 subscription.id = self.gen_id();
913 self.catalog.write().create_subscription(&subscription);
914 self.add_table_or_subscription_id(
915 subscription.id.as_raw_id(),
916 subscription.schema_id,
917 subscription.database_id,
918 );
919 Ok(())
920 }
921
922 fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
923 *self
924 .schema_id_to_database_id
925 .read()
926 .get(&schema_id)
927 .unwrap()
928 }
929}
930
931pub struct MockUserInfoWriter {
932 id: AtomicU32,
933 user_info: Arc<RwLock<UserInfoManager>>,
934}
935
936#[async_trait::async_trait]
937impl UserInfoWriter for MockUserInfoWriter {
938 async fn create_user(&self, user: UserInfo) -> Result<()> {
939 let mut user = user;
940 user.id = self.gen_id().into();
941 self.user_info.write().create_user(user);
942 Ok(())
943 }
944
945 async fn drop_user(&self, id: UserId) -> Result<()> {
946 self.user_info.write().drop_user(id);
947 Ok(())
948 }
949
950 async fn update_user(
951 &self,
952 update_user: UserInfo,
953 update_fields: Vec<UpdateField>,
954 ) -> Result<()> {
955 let mut lock = self.user_info.write();
956 let id = update_user.get_id();
957 let Some(old_name) = lock.get_user_name_by_id(id) else {
958 return Ok(());
959 };
960 let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
961 update_fields.into_iter().for_each(|field| match field {
962 UpdateField::Super => user_info.is_super = update_user.is_super,
963 UpdateField::Login => user_info.can_login = update_user.can_login,
964 UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
965 UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
966 UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
967 UpdateField::Rename => user_info.name.clone_from(&update_user.name),
968 UpdateField::Admin => user_info.is_admin = update_user.is_admin,
969 UpdateField::Unspecified => unreachable!(),
970 });
971 lock.update_user(update_user);
972 Ok(())
973 }
974
975 async fn grant_privilege(
978 &self,
979 users: Vec<UserId>,
980 privileges: Vec<GrantPrivilege>,
981 with_grant_option: bool,
982 _grantor: UserId,
983 ) -> Result<()> {
984 let privileges = privileges
985 .into_iter()
986 .map(|mut p| {
987 p.action_with_opts
988 .iter_mut()
989 .for_each(|ao| ao.with_grant_option = with_grant_option);
990 p
991 })
992 .collect::<Vec<_>>();
993 for user_id in users {
994 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
995 u.extend_privileges(privileges.clone());
996 }
997 }
998 Ok(())
999 }
1000
1001 async fn revoke_privilege(
1004 &self,
1005 users: Vec<UserId>,
1006 privileges: Vec<GrantPrivilege>,
1007 _granted_by: UserId,
1008 _revoke_by: UserId,
1009 revoke_grant_option: bool,
1010 _cascade: bool,
1011 ) -> Result<()> {
1012 for user_id in users {
1013 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1014 u.revoke_privileges(privileges.clone(), revoke_grant_option);
1015 }
1016 }
1017 Ok(())
1018 }
1019
1020 async fn alter_default_privilege(
1021 &self,
1022 _users: Vec<UserId>,
1023 _database_id: DatabaseId,
1024 _schemas: Vec<SchemaId>,
1025 _operation: AlterDefaultPrivilegeOperation,
1026 _operated_by: UserId,
1027 ) -> Result<()> {
1028 todo!()
1029 }
1030}
1031
1032impl MockUserInfoWriter {
1033 pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1034 user_info.write().create_user(UserInfo {
1035 id: DEFAULT_SUPER_USER_ID,
1036 name: DEFAULT_SUPER_USER.to_owned(),
1037 is_super: true,
1038 can_create_db: true,
1039 can_create_user: true,
1040 can_login: true,
1041 ..Default::default()
1042 });
1043 user_info.write().create_user(UserInfo {
1044 id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1045 name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1046 is_super: true,
1047 can_create_db: true,
1048 can_create_user: true,
1049 can_login: true,
1050 is_admin: true,
1051 ..Default::default()
1052 });
1053 Self {
1054 user_info,
1055 id: AtomicU32::new(NON_RESERVED_USER_ID.as_raw_id()),
1056 }
1057 }
1058
1059 fn gen_id(&self) -> u32 {
1060 self.id.fetch_add(1, Ordering::SeqCst)
1061 }
1062}
1063
1064pub struct MockFrontendMetaClient {}
1065
1066#[async_trait::async_trait]
1067impl FrontendMetaClient for MockFrontendMetaClient {
1068 async fn try_unregister(&self) {}
1069
1070 async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1071 Ok(INVALID_VERSION_ID)
1072 }
1073
1074 async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1075 Ok(vec![])
1076 }
1077
1078 async fn list_table_fragments(
1079 &self,
1080 _table_ids: &[JobId],
1081 ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1082 Ok(HashMap::default())
1083 }
1084
1085 async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1086 Ok(vec![])
1087 }
1088
1089 async fn list_fragment_distribution(
1090 &self,
1091 _include_node: bool,
1092 ) -> RpcResult<Vec<FragmentDistribution>> {
1093 Ok(vec![])
1094 }
1095
1096 async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1097 Ok(vec![])
1098 }
1099
1100 async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1101 Ok(vec![])
1102 }
1103
1104 async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1105 Ok(vec![])
1106 }
1107
1108 async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1109 Ok(vec![])
1110 }
1111
1112 async fn list_sink_log_store_tables(
1113 &self,
1114 ) -> RpcResult<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1115 Ok(vec![])
1116 }
1117
1118 async fn set_system_param(
1119 &self,
1120 _param: String,
1121 _value: Option<String>,
1122 ) -> RpcResult<Option<SystemParamsReader>> {
1123 Ok(Some(SystemParams::default().into()))
1124 }
1125
1126 async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1127 Ok(Default::default())
1128 }
1129
1130 async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1131 Ok("".to_owned())
1132 }
1133
1134 async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1135 Ok(vec![])
1136 }
1137
1138 async fn get_tables(
1139 &self,
1140 _table_ids: Vec<crate::catalog::TableId>,
1141 _include_dropped_tables: bool,
1142 ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1143 Ok(HashMap::new())
1144 }
1145
1146 async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, HummockVersionId)>> {
1147 unimplemented!()
1148 }
1149
1150 async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1151 unimplemented!()
1152 }
1153
1154 async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1155 Ok(HummockVersion::default())
1156 }
1157
1158 async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1159 unimplemented!()
1160 }
1161
1162 async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1163 unimplemented!()
1164 }
1165
1166 async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1167 unimplemented!()
1168 }
1169
1170 async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1171 unimplemented!()
1172 }
1173
1174 async fn list_hummock_active_write_limits(
1175 &self,
1176 ) -> RpcResult<HashMap<CompactionGroupId, WriteLimit>> {
1177 unimplemented!()
1178 }
1179
1180 async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1181 unimplemented!()
1182 }
1183
1184 async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1185 Ok(vec![])
1186 }
1187
1188 async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1189 unimplemented!()
1190 }
1191
1192 async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1193 Ok(vec![])
1194 }
1195
1196 async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1197 unimplemented!()
1198 }
1199
1200 async fn recover(&self) -> RpcResult<()> {
1201 unimplemented!()
1202 }
1203
1204 async fn apply_throttle(
1205 &self,
1206 _throttle_target: PbThrottleTarget,
1207 _throttle_type: risingwave_pb::common::PbThrottleType,
1208 _id: u32,
1209 _rate_limit: Option<u32>,
1210 ) -> RpcResult<()> {
1211 unimplemented!()
1212 }
1213
1214 async fn alter_fragment_parallelism(
1215 &self,
1216 _fragment_ids: Vec<FragmentId>,
1217 _parallelism: Option<PbTableParallelism>,
1218 ) -> RpcResult<()> {
1219 unimplemented!()
1220 }
1221
1222 async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1223 Ok(RecoveryStatus::StatusRunning)
1224 }
1225
1226 async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1227 Ok(vec![])
1228 }
1229
1230 async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1231 Ok(vec![])
1232 }
1233
1234 async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1235 Ok(HashMap::default())
1236 }
1237
1238 async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1239 unimplemented!()
1240 }
1241
1242 async fn alter_sink_props(
1243 &self,
1244 _sink_id: SinkId,
1245 _changed_props: BTreeMap<String, String>,
1246 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1247 _connector_conn_ref: Option<ConnectionId>,
1248 ) -> RpcResult<()> {
1249 unimplemented!()
1250 }
1251
1252 async fn alter_iceberg_table_props(
1253 &self,
1254 _table_id: TableId,
1255 _sink_id: SinkId,
1256 _source_id: SourceId,
1257 _changed_props: BTreeMap<String, String>,
1258 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1259 _connector_conn_ref: Option<ConnectionId>,
1260 ) -> RpcResult<()> {
1261 unimplemented!()
1262 }
1263
1264 async fn alter_source_connector_props(
1265 &self,
1266 _source_id: SourceId,
1267 _changed_props: BTreeMap<String, String>,
1268 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1269 _connector_conn_ref: Option<ConnectionId>,
1270 ) -> RpcResult<()> {
1271 unimplemented!()
1272 }
1273
1274 async fn alter_connection_connector_props(
1275 &self,
1276 _connection_id: u32,
1277 _changed_props: BTreeMap<String, String>,
1278 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1279 ) -> RpcResult<()> {
1280 Ok(())
1281 }
1282
1283 async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1284 unimplemented!()
1285 }
1286
1287 async fn get_fragment_by_id(
1288 &self,
1289 _fragment_id: FragmentId,
1290 ) -> RpcResult<Option<FragmentDistribution>> {
1291 unimplemented!()
1292 }
1293
1294 async fn get_fragment_vnodes(
1295 &self,
1296 _fragment_id: FragmentId,
1297 ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1298 unimplemented!()
1299 }
1300
1301 async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1302 unimplemented!()
1303 }
1304
1305 fn worker_id(&self) -> WorkerId {
1306 0.into()
1307 }
1308
1309 async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1310 Ok(())
1311 }
1312
1313 async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1314 Ok(1)
1315 }
1316
1317 async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1318 Ok(())
1319 }
1320
1321 async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1322 Ok(RefreshResponse { status: None })
1323 }
1324
1325 fn cluster_id(&self) -> &str {
1326 "test-cluster-uuid"
1327 }
1328
1329 async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1330 unimplemented!()
1331 }
1332}
1333
1334#[cfg(test)]
1335pub static PROTO_FILE_DATA: &str = r#"
1336 syntax = "proto3";
1337 package test;
1338 message TestRecord {
1339 int32 id = 1;
1340 Country country = 3;
1341 int64 zipcode = 4;
1342 float rate = 5;
1343 }
1344 message TestRecordAlterType {
1345 string id = 1;
1346 Country country = 3;
1347 int32 zipcode = 4;
1348 float rate = 5;
1349 }
1350 message TestRecordExt {
1351 int32 id = 1;
1352 Country country = 3;
1353 int64 zipcode = 4;
1354 float rate = 5;
1355 string name = 6;
1356 }
1357 message Country {
1358 string address = 1;
1359 City city = 2;
1360 string zipcode = 3;
1361 }
1362 message City {
1363 string address = 1;
1364 string zipcode = 2;
1365 }"#;
1366
1367pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1370 let in_file = Builder::new()
1371 .prefix("temp")
1372 .suffix(".proto")
1373 .rand_bytes(8)
1374 .tempfile()
1375 .unwrap();
1376
1377 let out_file = Builder::new()
1378 .prefix("temp")
1379 .suffix(".pb")
1380 .rand_bytes(8)
1381 .tempfile()
1382 .unwrap();
1383
1384 let mut file = in_file.as_file();
1385 file.write_all(proto_data.as_ref())
1386 .expect("writing binary to test file");
1387 file.flush().expect("flush temp file failed");
1388 let include_path = in_file
1389 .path()
1390 .parent()
1391 .unwrap()
1392 .to_string_lossy()
1393 .into_owned();
1394 let out_path = out_file.path().to_string_lossy().into_owned();
1395 let in_path = in_file.path().to_string_lossy().into_owned();
1396 let mut compile = std::process::Command::new("protoc");
1397
1398 let out = compile
1399 .arg("--include_imports")
1400 .arg("-I")
1401 .arg(include_path)
1402 .arg(format!("--descriptor_set_out={}", out_path))
1403 .arg(in_path)
1404 .output()
1405 .expect("failed to compile proto");
1406 if !out.status.success() {
1407 panic!("compile proto failed \n output: {:?}", out);
1408 }
1409 out_file
1410}