1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28 AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29 DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30 FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31 RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::util::cluster_limit::ClusterLimit;
38use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
39use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
40use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID};
41use risingwave_pb::backup_service::MetaSnapshotMetadata;
42use risingwave_pb::catalog::{
43 PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44 PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
49use risingwave_pb::ddl_service::{
50 DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
51 alter_swap_rename_request, create_connection_request, streaming_job_resource_type,
52};
53use risingwave_pb::hummock::write_limits::WriteLimit;
54use risingwave_pb::hummock::{
55 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
56};
57use risingwave_pb::id::ActorId;
58use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
59use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
60use risingwave_pb::meta::list_actor_states_response::ActorState;
61use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
62use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
63use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
64use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
65use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
66use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
67use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
68use risingwave_pb::meta::{
69 EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
70 RefreshRequest, RefreshResponse, SystemParams,
71};
72use risingwave_pb::secret::PbSecretRef;
73use risingwave_pb::stream_plan::StreamFragmentGraph;
74use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
75use risingwave_pb::user::update_user_request::UpdateField;
76use risingwave_pb::user::{GrantPrivilege, UserInfo};
77use risingwave_rpc_client::error::Result as RpcResult;
78use tempfile::{Builder, NamedTempFile};
79
80use crate::FrontendOpts;
81use crate::catalog::catalog_service::CatalogWriter;
82use crate::catalog::root_catalog::Catalog;
83use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
84use crate::error::{ErrorCode, Result, RwError};
85use crate::handler::RwPgResponse;
86use crate::meta_client::FrontendMetaClient;
87use crate::scheduler::HummockSnapshotManagerRef;
88use crate::session::{AuthContext, FrontendEnv, SessionImpl};
89use crate::user::UserId;
90use crate::user::user_manager::UserInfoManager;
91use crate::user::user_service::UserInfoWriter;
92
93pub struct LocalFrontend {
95 pub opts: FrontendOpts,
96 env: FrontendEnv,
97}
98
99impl SessionManager for LocalFrontend {
100 type Error = RwError;
101 type Session = SessionImpl;
102
103 fn create_dummy_session(
104 &self,
105 _database_id: DatabaseId,
106 ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
107 unreachable!()
108 }
109
110 fn connect(
111 &self,
112 _database: &str,
113 _user_name: &str,
114 _peer_addr: AddressRef,
115 ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
116 Ok(self.session_ref())
117 }
118
119 fn cancel_queries_in_session(&self, _session_id: SessionId) {
120 unreachable!()
121 }
122
123 fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
124 unreachable!()
125 }
126
127 fn end_session(&self, _session: &Self::Session) {
128 unreachable!()
129 }
130}
131
132impl LocalFrontend {
133 #[expect(clippy::unused_async)]
134 pub async fn new(opts: FrontendOpts) -> Self {
135 let env = FrontendEnv::mock();
136 Self { opts, env }
137 }
138
139 pub async fn run_sql(
140 &self,
141 sql: impl Into<String>,
142 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
143 let sql: Arc<str> = Arc::from(sql.into());
144 self.session_ref().run_statement(sql, vec![]).await
145 }
146
147 pub async fn run_sql_with_session(
148 &self,
149 session_ref: Arc<SessionImpl>,
150 sql: impl Into<String>,
151 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
152 let sql: Arc<str> = Arc::from(sql.into());
153 session_ref.run_statement(sql, vec![]).await
154 }
155
156 pub async fn run_user_sql(
157 &self,
158 sql: impl Into<String>,
159 database: String,
160 user_name: String,
161 user_id: UserId,
162 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
163 let sql: Arc<str> = Arc::from(sql.into());
164 self.session_user_ref(database, user_name, user_id)
165 .run_statement(sql, vec![])
166 .await
167 }
168
169 pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
170 let mut rsp = self.run_sql(sql).await.unwrap();
171 let mut res = vec![];
172 #[for_await]
173 for row_set in rsp.values_stream() {
174 for row in row_set.unwrap() {
175 res.push(format!("{:?}", row));
176 }
177 }
178 res
179 }
180
181 pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
182 let mut rsp = self.run_sql(sql).await.unwrap();
183 assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
184 let mut res = String::new();
185 #[for_await]
186 for row_set in rsp.values_stream() {
187 for row in row_set.unwrap() {
188 let row: Row = row;
189 let row = row.values()[0].as_ref().unwrap();
190 res += std::str::from_utf8(row).unwrap();
191 res += "\n";
192 }
193 }
194 res
195 }
196
197 pub fn session_ref(&self) -> Arc<SessionImpl> {
199 self.session_user_ref(
200 DEFAULT_DATABASE_NAME.to_owned(),
201 DEFAULT_SUPER_USER.to_owned(),
202 DEFAULT_SUPER_USER_ID,
203 )
204 }
205
206 pub fn session_user_ref(
207 &self,
208 database: String,
209 user_name: String,
210 user_id: UserId,
211 ) -> Arc<SessionImpl> {
212 Arc::new(SessionImpl::new(
213 self.env.clone(),
214 AuthContext::new(database, user_name, user_id),
215 UserAuthenticator::None,
216 (0, 0),
218 Address::Tcp(SocketAddr::new(
219 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
220 6666,
221 ))
222 .into(),
223 Default::default(),
224 ))
225 }
226}
227
228pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
229 if rsp.stmt_type() != StatementType::EXPLAIN {
230 panic!("RESPONSE INVALID: {rsp:?}");
231 }
232 let mut res = String::new();
233 #[for_await]
234 for row_set in rsp.values_stream() {
235 for row in row_set.unwrap() {
236 let row: Row = row;
237 let row = row.values()[0].as_ref().unwrap();
238 res += std::str::from_utf8(row).unwrap();
239 res += "\n";
240 }
241 }
242 res
243}
244
245pub struct MockCatalogWriter {
246 catalog: Arc<RwLock<Catalog>>,
247 id: AtomicU32,
248 table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
249 schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
250 hummock_snapshot_manager: HummockSnapshotManagerRef,
251}
252
253#[async_trait::async_trait]
254impl CatalogWriter for MockCatalogWriter {
255 async fn create_database(
256 &self,
257 db_name: &str,
258 owner: UserId,
259 resource_group: &str,
260 barrier_interval_ms: Option<u32>,
261 checkpoint_frequency: Option<u64>,
262 ) -> Result<()> {
263 let database_id = DatabaseId::new(self.gen_id());
264 self.catalog.write().create_database(&PbDatabase {
265 name: db_name.to_owned(),
266 id: database_id,
267 owner,
268 resource_group: resource_group.to_owned(),
269 barrier_interval_ms,
270 checkpoint_frequency,
271 });
272 self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
273 .await?;
274 self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
275 .await?;
276 self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
277 .await?;
278 Ok(())
279 }
280
281 async fn create_schema(
282 &self,
283 db_id: DatabaseId,
284 schema_name: &str,
285 owner: UserId,
286 ) -> Result<()> {
287 let id = self.gen_id();
288 self.catalog.write().create_schema(&PbSchema {
289 id,
290 name: schema_name.to_owned(),
291 database_id: db_id,
292 owner,
293 });
294 self.add_schema_id(id, db_id);
295 Ok(())
296 }
297
298 async fn create_materialized_view(
299 &self,
300 mut table: PbTable,
301 _graph: StreamFragmentGraph,
302 _dependencies: HashSet<ObjectId>,
303 _resource_type: streaming_job_resource_type::ResourceType,
304 _if_not_exists: bool,
305 ) -> Result<()> {
306 table.id = self.gen_id();
307 table.stream_job_status = PbStreamJobStatus::Created as _;
308 table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
309 self.catalog.write().create_table(&table);
310 self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
311 self.hummock_snapshot_manager.add_table_for_test(table.id);
312 Ok(())
313 }
314
315 async fn replace_materialized_view(
316 &self,
317 mut table: PbTable,
318 _graph: StreamFragmentGraph,
319 ) -> Result<()> {
320 table.stream_job_status = PbStreamJobStatus::Created as _;
321 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
322 self.catalog.write().update_table(&table);
323 Ok(())
324 }
325
326 async fn create_view(&self, mut view: PbView, _dependencies: HashSet<ObjectId>) -> Result<()> {
327 view.id = self.gen_id();
328 self.catalog.write().create_view(&view);
329 self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
330 Ok(())
331 }
332
333 async fn create_table(
334 &self,
335 source: Option<PbSource>,
336 mut table: PbTable,
337 graph: StreamFragmentGraph,
338 _job_type: PbTableJobType,
339 if_not_exists: bool,
340 _dependencies: HashSet<ObjectId>,
341 ) -> Result<()> {
342 if let Some(source) = source {
343 let source_id = self.create_source_inner(source)?;
344 table.optional_associated_source_id = Some(source_id.into());
345 }
346 self.create_materialized_view(
347 table,
348 graph,
349 HashSet::new(),
350 streaming_job_resource_type::ResourceType::Regular(true),
351 if_not_exists,
352 )
353 .await?;
354 Ok(())
355 }
356
357 async fn replace_table(
358 &self,
359 _source: Option<PbSource>,
360 mut table: PbTable,
361 _graph: StreamFragmentGraph,
362 _job_type: TableJobType,
363 ) -> Result<()> {
364 table.stream_job_status = PbStreamJobStatus::Created as _;
365 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
366 self.catalog.write().update_table(&table);
367 Ok(())
368 }
369
370 async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
371 self.catalog.write().update_source(&source);
372 Ok(())
373 }
374
375 async fn create_source(
376 &self,
377 source: PbSource,
378 _graph: Option<StreamFragmentGraph>,
379 _if_not_exists: bool,
380 ) -> Result<()> {
381 self.create_source_inner(source).map(|_| ())
382 }
383
384 async fn create_sink(
385 &self,
386 sink: PbSink,
387 graph: StreamFragmentGraph,
388 _dependencies: HashSet<ObjectId>,
389 _if_not_exists: bool,
390 ) -> Result<()> {
391 self.create_sink_inner(sink, graph)
392 }
393
394 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
395 self.create_subscription_inner(subscription)
396 }
397
398 async fn create_index(
399 &self,
400 mut index: PbIndex,
401 mut index_table: PbTable,
402 _graph: StreamFragmentGraph,
403 _if_not_exists: bool,
404 ) -> Result<()> {
405 index_table.id = self.gen_id();
406 index_table.stream_job_status = PbStreamJobStatus::Created as _;
407 index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
408 self.catalog.write().create_table(&index_table);
409 self.add_table_or_index_id(
410 index_table.id.as_raw_id(),
411 index_table.schema_id,
412 index_table.database_id,
413 );
414
415 index.id = index_table.id.as_raw_id().into();
416 index.index_table_id = index_table.id;
417 self.catalog.write().create_index(&index);
418 Ok(())
419 }
420
421 async fn create_function(&self, _function: PbFunction) -> Result<()> {
422 unreachable!()
423 }
424
425 async fn create_connection(
426 &self,
427 _connection_name: String,
428 _database_id: DatabaseId,
429 _schema_id: SchemaId,
430 _owner_id: u32,
431 _connection: create_connection_request::Payload,
432 ) -> Result<()> {
433 unreachable!()
434 }
435
436 async fn create_secret(
437 &self,
438 _secret_name: String,
439 _database_id: DatabaseId,
440 _schema_id: SchemaId,
441 _owner_id: u32,
442 _payload: Vec<u8>,
443 ) -> Result<()> {
444 unreachable!()
445 }
446
447 async fn comment_on(&self, _comment: PbComment) -> Result<()> {
448 unreachable!()
449 }
450
451 async fn drop_table(
452 &self,
453 source_id: Option<u32>,
454 table_id: TableId,
455 cascade: bool,
456 ) -> Result<()> {
457 if cascade {
458 return Err(ErrorCode::NotSupported(
459 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
460 "use drop instead".to_owned(),
461 )
462 .into());
463 }
464 if let Some(source_id) = source_id {
465 self.drop_table_or_source_id(source_id);
466 }
467 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
468 let indexes =
469 self.catalog
470 .read()
471 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
472 for index in indexes {
473 self.drop_index(index.id, cascade).await?;
474 }
475 self.catalog
476 .write()
477 .drop_table(database_id, schema_id, table_id);
478 if let Some(source_id) = source_id {
479 self.catalog
480 .write()
481 .drop_source(database_id, schema_id, source_id.into());
482 }
483 Ok(())
484 }
485
486 async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
487 unreachable!()
488 }
489
490 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
491 if cascade {
492 return Err(ErrorCode::NotSupported(
493 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
494 "use drop instead".to_owned(),
495 )
496 .into());
497 }
498 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
499 let indexes =
500 self.catalog
501 .read()
502 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
503 for index in indexes {
504 self.drop_index(index.id, cascade).await?;
505 }
506 self.catalog
507 .write()
508 .drop_table(database_id, schema_id, table_id);
509 Ok(())
510 }
511
512 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
513 if cascade {
514 return Err(ErrorCode::NotSupported(
515 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
516 "use drop instead".to_owned(),
517 )
518 .into());
519 }
520 let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
521 self.catalog
522 .write()
523 .drop_source(database_id, schema_id, source_id);
524 Ok(())
525 }
526
527 async fn reset_source(&self, _source_id: SourceId) -> Result<()> {
528 Ok(())
529 }
530
531 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
532 if cascade {
533 return Err(ErrorCode::NotSupported(
534 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
535 "use drop instead".to_owned(),
536 )
537 .into());
538 }
539 let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
540 self.catalog
541 .write()
542 .drop_sink(database_id, schema_id, sink_id);
543 Ok(())
544 }
545
546 async fn drop_subscription(
547 &self,
548 subscription_id: SubscriptionId,
549 cascade: bool,
550 ) -> Result<()> {
551 if cascade {
552 return Err(ErrorCode::NotSupported(
553 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
554 "use drop instead".to_owned(),
555 )
556 .into());
557 }
558 let (database_id, schema_id) =
559 self.drop_table_or_subscription_id(subscription_id.as_raw_id());
560 self.catalog
561 .write()
562 .drop_subscription(database_id, schema_id, subscription_id);
563 Ok(())
564 }
565
566 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
567 if cascade {
568 return Err(ErrorCode::NotSupported(
569 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
570 "use drop instead".to_owned(),
571 )
572 .into());
573 }
574 let &schema_id = self
575 .table_id_to_schema_id
576 .read()
577 .get(&index_id.as_raw_id())
578 .unwrap();
579 let database_id = self.get_database_id_by_schema(schema_id);
580
581 let index = {
582 let catalog_reader = self.catalog.read();
583 let schema_catalog = catalog_reader
584 .get_schema_by_id(database_id, schema_id)
585 .unwrap();
586 schema_catalog.get_index_by_id(index_id).unwrap().clone()
587 };
588
589 let index_table_id = index.index_table().id;
590 let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
591 self.catalog
592 .write()
593 .drop_index(database_id, schema_id, index_id);
594 self.catalog
595 .write()
596 .drop_table(database_id, schema_id, index_table_id);
597 Ok(())
598 }
599
600 async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
601 unreachable!()
602 }
603
604 async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
605 unreachable!()
606 }
607
608 async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> {
609 unreachable!()
610 }
611
612 async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
613 self.catalog.write().drop_database(database_id);
614 Ok(())
615 }
616
617 async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
618 let database_id = self.drop_schema_id(schema_id);
619 self.catalog.write().drop_schema(database_id, schema_id);
620 Ok(())
621 }
622
623 async fn alter_name(
624 &self,
625 object_id: alter_name_request::Object,
626 object_name: &str,
627 ) -> Result<()> {
628 match object_id {
629 alter_name_request::Object::TableId(table_id) => {
630 self.catalog
631 .write()
632 .alter_table_name_by_id(table_id.into(), object_name);
633 Ok(())
634 }
635 _ => {
636 unimplemented!()
637 }
638 }
639 }
640
641 async fn alter_source(&self, source: PbSource) -> Result<()> {
642 self.catalog.write().update_source(&source);
643 Ok(())
644 }
645
646 async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
647 for database in self.catalog.read().iter_databases() {
648 for schema in database.iter_schemas() {
649 match object {
650 Object::TableId(table_id) => {
651 if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
652 {
653 let mut pb_table = table.to_prost();
654 pb_table.owner = owner_id;
655 self.catalog.write().update_table(&pb_table);
656 return Ok(());
657 }
658 }
659 _ => unreachable!(),
660 }
661 }
662 }
663
664 Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
665 }
666
667 async fn alter_set_schema(
668 &self,
669 object: alter_set_schema_request::Object,
670 new_schema_id: SchemaId,
671 ) -> Result<()> {
672 match object {
673 alter_set_schema_request::Object::TableId(table_id) => {
674 let mut pb_table = {
675 let reader = self.catalog.read();
676 let table = reader.get_any_table_by_id(table_id.into())?.to_owned();
677 table.to_prost()
678 };
679 pb_table.schema_id = new_schema_id;
680 self.catalog.write().update_table(&pb_table);
681 self.table_id_to_schema_id
682 .write()
683 .insert(table_id, new_schema_id);
684 Ok(())
685 }
686 _ => unreachable!(),
687 }
688 }
689
690 async fn alter_parallelism(
691 &self,
692 _job_id: JobId,
693 _parallelism: PbTableParallelism,
694 _deferred: bool,
695 ) -> Result<()> {
696 todo!()
697 }
698
699 async fn alter_config(
700 &self,
701 _job_id: JobId,
702 _entries_to_add: HashMap<String, String>,
703 _keys_to_remove: Vec<String>,
704 ) -> Result<()> {
705 todo!()
706 }
707
708 async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
709 todo!()
710 }
711
712 async fn alter_secret(
713 &self,
714 _secret_id: SecretId,
715 _secret_name: String,
716 _database_id: DatabaseId,
717 _schema_id: SchemaId,
718 _owner_id: u32,
719 _payload: Vec<u8>,
720 ) -> Result<()> {
721 unreachable!()
722 }
723
724 async fn alter_resource_group(
725 &self,
726 _table_id: TableId,
727 _resource_group: Option<String>,
728 _deferred: bool,
729 ) -> Result<()> {
730 todo!()
731 }
732
733 async fn alter_database_param(
734 &self,
735 database_id: DatabaseId,
736 param: AlterDatabaseParam,
737 ) -> Result<()> {
738 let mut pb_database = {
739 let reader = self.catalog.read();
740 let database = reader.get_database_by_id(database_id)?.to_owned();
741 database.to_prost()
742 };
743 match param {
744 AlterDatabaseParam::BarrierIntervalMs(interval) => {
745 pb_database.barrier_interval_ms = interval;
746 }
747 AlterDatabaseParam::CheckpointFrequency(frequency) => {
748 pb_database.checkpoint_frequency = frequency;
749 }
750 }
751 self.catalog.write().update_database(&pb_database);
752 Ok(())
753 }
754
755 async fn create_iceberg_table(
756 &self,
757 _table_job_info: PbTableJobInfo,
758 _sink_job_info: PbSinkJobInfo,
759 _iceberg_source: PbSource,
760 _if_not_exists: bool,
761 ) -> Result<()> {
762 todo!()
763 }
764}
765
766impl MockCatalogWriter {
767 pub fn new(
768 catalog: Arc<RwLock<Catalog>>,
769 hummock_snapshot_manager: HummockSnapshotManagerRef,
770 ) -> Self {
771 catalog.write().create_database(&PbDatabase {
772 id: 0.into(),
773 name: DEFAULT_DATABASE_NAME.to_owned(),
774 owner: DEFAULT_SUPER_USER_ID,
775 resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
776 barrier_interval_ms: None,
777 checkpoint_frequency: None,
778 });
779 catalog.write().create_schema(&PbSchema {
780 id: 1.into(),
781 name: DEFAULT_SCHEMA_NAME.to_owned(),
782 database_id: 0.into(),
783 owner: DEFAULT_SUPER_USER_ID,
784 });
785 catalog.write().create_schema(&PbSchema {
786 id: 2.into(),
787 name: PG_CATALOG_SCHEMA_NAME.to_owned(),
788 database_id: 0.into(),
789 owner: DEFAULT_SUPER_USER_ID,
790 });
791 catalog.write().create_schema(&PbSchema {
792 id: 3.into(),
793 name: RW_CATALOG_SCHEMA_NAME.to_owned(),
794 database_id: 0.into(),
795 owner: DEFAULT_SUPER_USER_ID,
796 });
797 let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
798 map.insert(1_u32.into(), 0_u32.into());
799 map.insert(2_u32.into(), 0_u32.into());
800 map.insert(3_u32.into(), 0_u32.into());
801 Self {
802 catalog,
803 id: AtomicU32::new(3),
804 table_id_to_schema_id: Default::default(),
805 schema_id_to_database_id: RwLock::new(map),
806 hummock_snapshot_manager,
807 }
808 }
809
810 fn gen_id<T: From<u32>>(&self) -> T {
811 (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
813 }
814
815 fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
816 self.table_id_to_schema_id
817 .write()
818 .insert(table_id, schema_id);
819 }
820
821 fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
822 let schema_id = self
823 .table_id_to_schema_id
824 .write()
825 .remove(&table_id)
826 .unwrap();
827 (self.get_database_id_by_schema(schema_id), schema_id)
828 }
829
830 fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
831 self.table_id_to_schema_id
832 .write()
833 .insert(table_id, schema_id);
834 }
835
836 fn add_table_or_subscription_id(
837 &self,
838 table_id: u32,
839 schema_id: SchemaId,
840 _database_id: DatabaseId,
841 ) {
842 self.table_id_to_schema_id
843 .write()
844 .insert(table_id, schema_id);
845 }
846
847 fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
848 self.table_id_to_schema_id
849 .write()
850 .insert(table_id, schema_id);
851 }
852
853 fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
854 let schema_id = self
855 .table_id_to_schema_id
856 .write()
857 .remove(&table_id)
858 .unwrap();
859 (self.get_database_id_by_schema(schema_id), schema_id)
860 }
861
862 fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
863 let schema_id = self
864 .table_id_to_schema_id
865 .write()
866 .remove(&table_id)
867 .unwrap();
868 (self.get_database_id_by_schema(schema_id), schema_id)
869 }
870
871 fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
872 let schema_id = self
873 .table_id_to_schema_id
874 .write()
875 .remove(&table_id)
876 .unwrap();
877 (self.get_database_id_by_schema(schema_id), schema_id)
878 }
879
880 fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
881 self.schema_id_to_database_id
882 .write()
883 .insert(schema_id, database_id);
884 }
885
886 fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
887 self.schema_id_to_database_id
888 .write()
889 .remove(&schema_id)
890 .unwrap()
891 }
892
893 fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
894 source.id = self.gen_id();
895 self.catalog.write().create_source(&source);
896 self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
897 Ok(source.id)
898 }
899
900 fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
901 sink.id = self.gen_id();
902 sink.stream_job_status = PbStreamJobStatus::Created as _;
903 self.catalog.write().create_sink(&sink);
904 self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
905 Ok(())
906 }
907
908 fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
909 subscription.id = self.gen_id();
910 self.catalog.write().create_subscription(&subscription);
911 self.add_table_or_subscription_id(
912 subscription.id.as_raw_id(),
913 subscription.schema_id,
914 subscription.database_id,
915 );
916 Ok(())
917 }
918
919 fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
920 *self
921 .schema_id_to_database_id
922 .read()
923 .get(&schema_id)
924 .unwrap()
925 }
926}
927
928pub struct MockUserInfoWriter {
929 id: AtomicU32,
930 user_info: Arc<RwLock<UserInfoManager>>,
931}
932
933#[async_trait::async_trait]
934impl UserInfoWriter for MockUserInfoWriter {
935 async fn create_user(&self, user: UserInfo) -> Result<()> {
936 let mut user = user;
937 user.id = self.gen_id();
938 self.user_info.write().create_user(user);
939 Ok(())
940 }
941
942 async fn drop_user(&self, id: UserId) -> Result<()> {
943 self.user_info.write().drop_user(id);
944 Ok(())
945 }
946
947 async fn update_user(
948 &self,
949 update_user: UserInfo,
950 update_fields: Vec<UpdateField>,
951 ) -> Result<()> {
952 let mut lock = self.user_info.write();
953 let id = update_user.get_id();
954 let Some(old_name) = lock.get_user_name_by_id(id) else {
955 return Ok(());
956 };
957 let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
958 update_fields.into_iter().for_each(|field| match field {
959 UpdateField::Super => user_info.is_super = update_user.is_super,
960 UpdateField::Login => user_info.can_login = update_user.can_login,
961 UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
962 UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
963 UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
964 UpdateField::Rename => user_info.name.clone_from(&update_user.name),
965 UpdateField::Admin => user_info.is_admin = update_user.is_admin,
966 UpdateField::Unspecified => unreachable!(),
967 });
968 lock.update_user(update_user);
969 Ok(())
970 }
971
972 async fn grant_privilege(
975 &self,
976 users: Vec<UserId>,
977 privileges: Vec<GrantPrivilege>,
978 with_grant_option: bool,
979 _grantor: UserId,
980 ) -> Result<()> {
981 let privileges = privileges
982 .into_iter()
983 .map(|mut p| {
984 p.action_with_opts
985 .iter_mut()
986 .for_each(|ao| ao.with_grant_option = with_grant_option);
987 p
988 })
989 .collect::<Vec<_>>();
990 for user_id in users {
991 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
992 u.extend_privileges(privileges.clone());
993 }
994 }
995 Ok(())
996 }
997
998 async fn revoke_privilege(
1001 &self,
1002 users: Vec<UserId>,
1003 privileges: Vec<GrantPrivilege>,
1004 _granted_by: UserId,
1005 _revoke_by: UserId,
1006 revoke_grant_option: bool,
1007 _cascade: bool,
1008 ) -> Result<()> {
1009 for user_id in users {
1010 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1011 u.revoke_privileges(privileges.clone(), revoke_grant_option);
1012 }
1013 }
1014 Ok(())
1015 }
1016
1017 async fn alter_default_privilege(
1018 &self,
1019 _users: Vec<UserId>,
1020 _database_id: DatabaseId,
1021 _schemas: Vec<SchemaId>,
1022 _operation: AlterDefaultPrivilegeOperation,
1023 _operated_by: UserId,
1024 ) -> Result<()> {
1025 todo!()
1026 }
1027}
1028
1029impl MockUserInfoWriter {
1030 pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1031 user_info.write().create_user(UserInfo {
1032 id: DEFAULT_SUPER_USER_ID,
1033 name: DEFAULT_SUPER_USER.to_owned(),
1034 is_super: true,
1035 can_create_db: true,
1036 can_create_user: true,
1037 can_login: true,
1038 ..Default::default()
1039 });
1040 user_info.write().create_user(UserInfo {
1041 id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1042 name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1043 is_super: true,
1044 can_create_db: true,
1045 can_create_user: true,
1046 can_login: true,
1047 is_admin: true,
1048 ..Default::default()
1049 });
1050 Self {
1051 user_info,
1052 id: AtomicU32::new(NON_RESERVED_USER_ID as u32),
1053 }
1054 }
1055
1056 fn gen_id(&self) -> u32 {
1057 self.id.fetch_add(1, Ordering::SeqCst)
1058 }
1059}
1060
1061pub struct MockFrontendMetaClient {}
1062
1063#[async_trait::async_trait]
1064impl FrontendMetaClient for MockFrontendMetaClient {
1065 async fn try_unregister(&self) {}
1066
1067 async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1068 Ok(INVALID_VERSION_ID)
1069 }
1070
1071 async fn wait(&self) -> RpcResult<()> {
1072 Ok(())
1073 }
1074
1075 async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1076 Ok(vec![])
1077 }
1078
1079 async fn list_table_fragments(
1080 &self,
1081 _table_ids: &[JobId],
1082 ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1083 Ok(HashMap::default())
1084 }
1085
1086 async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1087 Ok(vec![])
1088 }
1089
1090 async fn list_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1091 Ok(vec![])
1092 }
1093
1094 async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1095 Ok(vec![])
1096 }
1097
1098 async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1099 Ok(vec![])
1100 }
1101
1102 async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1103 Ok(vec![])
1104 }
1105
1106 async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
1107 Ok(vec![])
1108 }
1109
1110 async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1111 Ok(vec![])
1112 }
1113
1114 async fn set_system_param(
1115 &self,
1116 _param: String,
1117 _value: Option<String>,
1118 ) -> RpcResult<Option<SystemParamsReader>> {
1119 Ok(Some(SystemParams::default().into()))
1120 }
1121
1122 async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1123 Ok(Default::default())
1124 }
1125
1126 async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1127 Ok("".to_owned())
1128 }
1129
1130 async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1131 Ok(vec![])
1132 }
1133
1134 async fn get_tables(
1135 &self,
1136 _table_ids: Vec<crate::catalog::TableId>,
1137 _include_dropped_tables: bool,
1138 ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1139 Ok(HashMap::new())
1140 }
1141
1142 async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, u64)>> {
1143 unimplemented!()
1144 }
1145
1146 async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1147 unimplemented!()
1148 }
1149
1150 async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1151 Ok(HummockVersion::default())
1152 }
1153
1154 async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1155 unimplemented!()
1156 }
1157
1158 async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1159 unimplemented!()
1160 }
1161
1162 async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1163 unimplemented!()
1164 }
1165
1166 async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1167 unimplemented!()
1168 }
1169
1170 async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
1171 unimplemented!()
1172 }
1173
1174 async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1175 unimplemented!()
1176 }
1177
1178 async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1179 Ok(vec![])
1180 }
1181
1182 async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1183 unimplemented!()
1184 }
1185
1186 async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1187 Ok(vec![])
1188 }
1189
1190 async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1191 unimplemented!()
1192 }
1193
1194 async fn recover(&self) -> RpcResult<()> {
1195 unimplemented!()
1196 }
1197
1198 async fn apply_throttle(
1199 &self,
1200 _throttle_target: PbThrottleTarget,
1201 _throttle_type: risingwave_pb::common::PbThrottleType,
1202 _id: u32,
1203 _rate_limit: Option<u32>,
1204 ) -> RpcResult<()> {
1205 unimplemented!()
1206 }
1207
1208 async fn alter_fragment_parallelism(
1209 &self,
1210 _fragment_ids: Vec<FragmentId>,
1211 _parallelism: Option<PbTableParallelism>,
1212 ) -> RpcResult<()> {
1213 unimplemented!()
1214 }
1215
1216 async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1217 Ok(RecoveryStatus::StatusRunning)
1218 }
1219
1220 async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1221 Ok(vec![])
1222 }
1223
1224 async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1225 Ok(vec![])
1226 }
1227
1228 async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1229 Ok(HashMap::default())
1230 }
1231
1232 async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1233 unimplemented!()
1234 }
1235
1236 async fn alter_sink_props(
1237 &self,
1238 _sink_id: SinkId,
1239 _changed_props: BTreeMap<String, String>,
1240 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1241 _connector_conn_ref: Option<ConnectionId>,
1242 ) -> RpcResult<()> {
1243 unimplemented!()
1244 }
1245
1246 async fn alter_iceberg_table_props(
1247 &self,
1248 _table_id: TableId,
1249 _sink_id: SinkId,
1250 _source_id: SourceId,
1251 _changed_props: BTreeMap<String, String>,
1252 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1253 _connector_conn_ref: Option<ConnectionId>,
1254 ) -> RpcResult<()> {
1255 unimplemented!()
1256 }
1257
1258 async fn alter_source_connector_props(
1259 &self,
1260 _source_id: SourceId,
1261 _changed_props: BTreeMap<String, String>,
1262 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1263 _connector_conn_ref: Option<ConnectionId>,
1264 ) -> RpcResult<()> {
1265 unimplemented!()
1266 }
1267
1268 async fn alter_connection_connector_props(
1269 &self,
1270 _connection_id: u32,
1271 _changed_props: BTreeMap<String, String>,
1272 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1273 ) -> RpcResult<()> {
1274 Ok(())
1275 }
1276
1277 async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1278 unimplemented!()
1279 }
1280
1281 async fn get_fragment_by_id(
1282 &self,
1283 _fragment_id: FragmentId,
1284 ) -> RpcResult<Option<FragmentDistribution>> {
1285 unimplemented!()
1286 }
1287
1288 async fn get_fragment_vnodes(
1289 &self,
1290 _fragment_id: FragmentId,
1291 ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1292 unimplemented!()
1293 }
1294
1295 async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1296 unimplemented!()
1297 }
1298
1299 fn worker_id(&self) -> WorkerId {
1300 0.into()
1301 }
1302
1303 async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1304 Ok(())
1305 }
1306
1307 async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1308 Ok(1)
1309 }
1310
1311 async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1312 Ok(())
1313 }
1314
1315 async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1316 Ok(RefreshResponse { status: None })
1317 }
1318
1319 fn cluster_id(&self) -> &str {
1320 "test-cluster-uuid"
1321 }
1322
1323 async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1324 unimplemented!()
1325 }
1326}
1327
1328#[cfg(test)]
1329pub static PROTO_FILE_DATA: &str = r#"
1330 syntax = "proto3";
1331 package test;
1332 message TestRecord {
1333 int32 id = 1;
1334 Country country = 3;
1335 int64 zipcode = 4;
1336 float rate = 5;
1337 }
1338 message TestRecordAlterType {
1339 string id = 1;
1340 Country country = 3;
1341 int32 zipcode = 4;
1342 float rate = 5;
1343 }
1344 message TestRecordExt {
1345 int32 id = 1;
1346 Country country = 3;
1347 int64 zipcode = 4;
1348 float rate = 5;
1349 string name = 6;
1350 }
1351 message Country {
1352 string address = 1;
1353 City city = 2;
1354 string zipcode = 3;
1355 }
1356 message City {
1357 string address = 1;
1358 string zipcode = 2;
1359 }"#;
1360
1361pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1364 let in_file = Builder::new()
1365 .prefix("temp")
1366 .suffix(".proto")
1367 .rand_bytes(8)
1368 .tempfile()
1369 .unwrap();
1370
1371 let out_file = Builder::new()
1372 .prefix("temp")
1373 .suffix(".pb")
1374 .rand_bytes(8)
1375 .tempfile()
1376 .unwrap();
1377
1378 let mut file = in_file.as_file();
1379 file.write_all(proto_data.as_ref())
1380 .expect("writing binary to test file");
1381 file.flush().expect("flush temp file failed");
1382 let include_path = in_file
1383 .path()
1384 .parent()
1385 .unwrap()
1386 .to_string_lossy()
1387 .into_owned();
1388 let out_path = out_file.path().to_string_lossy().into_owned();
1389 let in_path = in_file.path().to_string_lossy().into_owned();
1390 let mut compile = std::process::Command::new("protoc");
1391
1392 let out = compile
1393 .arg("--include_imports")
1394 .arg("-I")
1395 .arg(include_path)
1396 .arg(format!("--descriptor_set_out={}", out_path))
1397 .arg(in_path)
1398 .output()
1399 .expect("failed to compile proto");
1400 if !out.status.success() {
1401 panic!("compile proto failed \n output: {:?}", out);
1402 }
1403 out_file
1404}