1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28 AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29 DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30 FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31 RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::util::cluster_limit::ClusterLimit;
38use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
39use risingwave_hummock_sdk::change_log::TableChangeLogs;
40use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
41use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
42use risingwave_pb::backup_service::MetaSnapshotMetadata;
43use risingwave_pb::catalog::{
44 PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
45 PbSubscription, PbTable, PbView, Table,
46};
47use risingwave_pb::common::{PbObjectType, WorkerNode};
48use risingwave_pb::ddl_service::alter_owner_request::Object;
49use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
50use risingwave_pb::ddl_service::{
51 DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
52 alter_swap_rename_request, create_connection_request, streaming_job_resource_type,
53};
54use risingwave_pb::hummock::write_limits::WriteLimit;
55use risingwave_pb::hummock::{
56 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
57};
58use risingwave_pb::id::ActorId;
59use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
60use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
61use risingwave_pb::meta::list_actor_states_response::ActorState;
62use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
63use risingwave_pb::meta::list_iceberg_compaction_status_response::IcebergCompactionStatus;
64use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
65use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
66use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
67use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
68use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
69use risingwave_pb::meta::{
70 EventLog, FragmentDistribution, ObjectDependency as PbObjectDependency, PbTableParallelism,
71 PbThrottleTarget, RecoveryStatus, RefreshRequest, RefreshResponse, SystemParams,
72 list_sink_log_store_tables_response,
73};
74use risingwave_pb::secret::PbSecretRef;
75use risingwave_pb::stream_plan::StreamFragmentGraph;
76use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
77use risingwave_pb::user::update_user_request::UpdateField;
78use risingwave_pb::user::{GrantPrivilege, UserInfo};
79use risingwave_rpc_client::error::Result as RpcResult;
80use tempfile::{Builder, NamedTempFile};
81
82use crate::FrontendOpts;
83use crate::catalog::catalog_service::CatalogWriter;
84use crate::catalog::root_catalog::Catalog;
85use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
86use crate::error::{ErrorCode, Result, RwError};
87use crate::handler::RwPgResponse;
88use crate::meta_client::FrontendMetaClient;
89use crate::scheduler::HummockSnapshotManagerRef;
90use crate::session::{AuthContext, FrontendEnv, SessionImpl};
91use crate::user::UserId;
92use crate::user::user_manager::UserInfoManager;
93use crate::user::user_service::UserInfoWriter;
94
95pub struct LocalFrontend {
97 pub opts: FrontendOpts,
98 env: FrontendEnv,
99}
100
101impl SessionManager for LocalFrontend {
102 type Error = RwError;
103 type Session = SessionImpl;
104
105 fn create_dummy_session(
106 &self,
107 _database_id: DatabaseId,
108 ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
109 unreachable!()
110 }
111
112 fn connect(
113 &self,
114 _database: &str,
115 _user_name: &str,
116 _peer_addr: AddressRef,
117 ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
118 Ok(self.session_ref())
119 }
120
121 fn cancel_queries_in_session(&self, _session_id: SessionId) {
122 unreachable!()
123 }
124
125 fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
126 unreachable!()
127 }
128
129 fn end_session(&self, _session: &Self::Session) {
130 unreachable!()
131 }
132}
133
134impl LocalFrontend {
135 #[expect(clippy::unused_async)]
136 pub async fn new(opts: FrontendOpts) -> Self {
137 let env = FrontendEnv::mock();
138 Self { opts, env }
139 }
140
141 pub async fn run_sql(
142 &self,
143 sql: impl Into<String>,
144 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
145 let sql: Arc<str> = Arc::from(sql.into());
146 Box::pin(self.session_ref().run_statement(sql, vec![])).await
147 }
148
149 pub async fn run_sql_with_session(
150 &self,
151 session_ref: Arc<SessionImpl>,
152 sql: impl Into<String>,
153 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
154 let sql: Arc<str> = Arc::from(sql.into());
155 Box::pin(session_ref.run_statement(sql, vec![])).await
156 }
157
158 pub async fn run_user_sql(
159 &self,
160 sql: impl Into<String>,
161 database: String,
162 user_name: String,
163 user_id: UserId,
164 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
165 let sql: Arc<str> = Arc::from(sql.into());
166 Box::pin(
167 self.session_user_ref(database, user_name, user_id)
168 .run_statement(sql, vec![]),
169 )
170 .await
171 }
172
173 pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
174 let mut rsp = self.run_sql(sql).await.unwrap();
175 let mut res = vec![];
176 #[for_await]
177 for row_set in rsp.values_stream() {
178 for row in row_set.unwrap() {
179 res.push(format!("{:?}", row));
180 }
181 }
182 res
183 }
184
185 pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
186 let mut rsp = self.run_sql(sql).await.unwrap();
187 assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
188 let mut res = String::new();
189 #[for_await]
190 for row_set in rsp.values_stream() {
191 for row in row_set.unwrap() {
192 let row: Row = row;
193 let row = row.values()[0].as_ref().unwrap();
194 res += std::str::from_utf8(row).unwrap();
195 res += "\n";
196 }
197 }
198 res
199 }
200
201 pub fn session_ref(&self) -> Arc<SessionImpl> {
203 self.session_user_ref(
204 DEFAULT_DATABASE_NAME.to_owned(),
205 DEFAULT_SUPER_USER.to_owned(),
206 DEFAULT_SUPER_USER_ID,
207 )
208 }
209
210 pub fn session_user_ref(
211 &self,
212 database: String,
213 user_name: String,
214 user_id: UserId,
215 ) -> Arc<SessionImpl> {
216 Arc::new(SessionImpl::new(
217 self.env.clone(),
218 AuthContext::new(database, user_name, user_id),
219 UserAuthenticator::None,
220 (0, 0),
222 Address::Tcp(SocketAddr::new(
223 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
224 6666,
225 ))
226 .into(),
227 Default::default(),
228 ))
229 }
230}
231
232pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
233 if rsp.stmt_type() != StatementType::EXPLAIN {
234 panic!("RESPONSE INVALID: {rsp:?}");
235 }
236 let mut res = String::new();
237 #[for_await]
238 for row_set in rsp.values_stream() {
239 for row in row_set.unwrap() {
240 let row: Row = row;
241 let row = row.values()[0].as_ref().unwrap();
242 res += std::str::from_utf8(row).unwrap();
243 res += "\n";
244 }
245 }
246 res
247}
248
249pub struct MockCatalogWriter {
250 catalog: Arc<RwLock<Catalog>>,
251 id: AtomicU32,
252 table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
253 schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
254 hummock_snapshot_manager: HummockSnapshotManagerRef,
255}
256
257#[async_trait::async_trait]
258impl CatalogWriter for MockCatalogWriter {
259 async fn create_database(
260 &self,
261 db_name: &str,
262 owner: UserId,
263 resource_group: &str,
264 barrier_interval_ms: Option<u32>,
265 checkpoint_frequency: Option<u64>,
266 ) -> Result<()> {
267 let database_id = DatabaseId::new(self.gen_id());
268 self.catalog.write().create_database(&PbDatabase {
269 name: db_name.to_owned(),
270 id: database_id,
271 owner,
272 resource_group: resource_group.to_owned(),
273 barrier_interval_ms,
274 checkpoint_frequency,
275 });
276 self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
277 .await?;
278 self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
279 .await?;
280 self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
281 .await?;
282 Ok(())
283 }
284
285 async fn create_schema(
286 &self,
287 db_id: DatabaseId,
288 schema_name: &str,
289 owner: UserId,
290 ) -> Result<()> {
291 let id = self.gen_id();
292 self.catalog.write().create_schema(&PbSchema {
293 id,
294 name: schema_name.to_owned(),
295 database_id: db_id,
296 owner,
297 });
298 self.add_schema_id(id, db_id);
299 Ok(())
300 }
301
302 async fn create_materialized_view(
303 &self,
304 mut table: PbTable,
305 _graph: StreamFragmentGraph,
306 dependencies: HashSet<ObjectId>,
307 _resource_type: streaming_job_resource_type::ResourceType,
308 _if_not_exists: bool,
309 ) -> Result<()> {
310 table.id = self.gen_id();
311 table.stream_job_status = PbStreamJobStatus::Created as _;
312 table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
313 self.catalog.write().create_table(&table);
314 self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
315 self.insert_object_dependencies(table.id.as_object_id(), dependencies);
316 self.hummock_snapshot_manager.add_table_for_test(table.id);
317 Ok(())
318 }
319
320 async fn replace_materialized_view(
321 &self,
322 mut table: PbTable,
323 _graph: StreamFragmentGraph,
324 ) -> Result<()> {
325 table.stream_job_status = PbStreamJobStatus::Created as _;
326 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
327 self.catalog.write().update_table(&table);
328 Ok(())
329 }
330
331 async fn create_view(&self, mut view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
332 view.id = self.gen_id();
333 self.catalog.write().create_view(&view);
334 self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
335 self.insert_object_dependencies(view.id.as_object_id(), dependencies);
336 Ok(())
337 }
338
339 async fn create_table(
340 &self,
341 source: Option<PbSource>,
342 mut table: PbTable,
343 graph: StreamFragmentGraph,
344 _job_type: PbTableJobType,
345 if_not_exists: bool,
346 dependencies: HashSet<ObjectId>,
347 ) -> Result<()> {
348 if let Some(source) = source {
349 let source_id = self.create_source_inner(source)?;
350 table.optional_associated_source_id = Some(source_id.into());
351 }
352 self.create_materialized_view(
353 table,
354 graph,
355 dependencies,
356 streaming_job_resource_type::ResourceType::Regular(true),
357 if_not_exists,
358 )
359 .await?;
360 Ok(())
361 }
362
363 async fn replace_table(
364 &self,
365 _source: Option<PbSource>,
366 mut table: PbTable,
367 _graph: StreamFragmentGraph,
368 _job_type: TableJobType,
369 ) -> Result<()> {
370 table.stream_job_status = PbStreamJobStatus::Created as _;
371 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
372 self.catalog.write().update_table(&table);
373 Ok(())
374 }
375
376 async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
377 self.catalog.write().update_source(&source);
378 Ok(())
379 }
380
381 async fn create_source(
382 &self,
383 source: PbSource,
384 _graph: Option<StreamFragmentGraph>,
385 _if_not_exists: bool,
386 ) -> Result<()> {
387 self.create_source_inner(source).map(|_| ())
388 }
389
390 async fn create_sink(
391 &self,
392 sink: PbSink,
393 graph: StreamFragmentGraph,
394 dependencies: HashSet<ObjectId>,
395 _if_not_exists: bool,
396 ) -> Result<()> {
397 let sink_id = self.create_sink_inner(sink, graph)?;
398 self.insert_object_dependencies(sink_id.as_object_id(), dependencies);
399 Ok(())
400 }
401
402 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
403 self.create_subscription_inner(subscription)
404 }
405
406 async fn create_index(
407 &self,
408 mut index: PbIndex,
409 mut index_table: PbTable,
410 _graph: StreamFragmentGraph,
411 _if_not_exists: bool,
412 ) -> Result<()> {
413 index_table.id = self.gen_id();
414 index_table.stream_job_status = PbStreamJobStatus::Created as _;
415 index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
416 self.catalog.write().create_table(&index_table);
417 self.add_table_or_index_id(
418 index_table.id.as_raw_id(),
419 index_table.schema_id,
420 index_table.database_id,
421 );
422
423 index.id = index_table.id.as_raw_id().into();
424 index.index_table_id = index_table.id;
425 self.catalog.write().create_index(&index);
426 Ok(())
427 }
428
429 async fn create_function(&self, _function: PbFunction) -> Result<()> {
430 unreachable!()
431 }
432
433 async fn create_connection(
434 &self,
435 _connection_name: String,
436 _database_id: DatabaseId,
437 _schema_id: SchemaId,
438 _owner_id: UserId,
439 _connection: create_connection_request::Payload,
440 ) -> Result<()> {
441 unreachable!()
442 }
443
444 async fn create_secret(
445 &self,
446 _secret_name: String,
447 _database_id: DatabaseId,
448 _schema_id: SchemaId,
449 _owner_id: UserId,
450 _payload: Vec<u8>,
451 ) -> Result<()> {
452 unreachable!()
453 }
454
455 async fn comment_on(&self, _comment: PbComment) -> Result<()> {
456 unreachable!()
457 }
458
459 async fn drop_table(
460 &self,
461 source_id: Option<SourceId>,
462 table_id: TableId,
463 cascade: bool,
464 ) -> Result<()> {
465 if cascade {
466 return Err(ErrorCode::NotSupported(
467 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
468 "use drop instead".to_owned(),
469 )
470 .into());
471 }
472 if let Some(source_id) = source_id {
473 self.drop_table_or_source_id(source_id.as_raw_id());
474 }
475 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
476 let indexes =
477 self.catalog
478 .read()
479 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
480 for index in indexes {
481 self.drop_index(index.id, cascade).await?;
482 }
483 self.catalog
484 .write()
485 .drop_table(database_id, schema_id, table_id);
486 if let Some(source_id) = source_id {
487 self.catalog
488 .write()
489 .drop_source(database_id, schema_id, source_id);
490 }
491 Ok(())
492 }
493
494 async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
495 unreachable!()
496 }
497
498 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
499 if cascade {
500 return Err(ErrorCode::NotSupported(
501 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
502 "use drop instead".to_owned(),
503 )
504 .into());
505 }
506 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
507 let indexes =
508 self.catalog
509 .read()
510 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
511 for index in indexes {
512 self.drop_index(index.id, cascade).await?;
513 }
514 self.catalog
515 .write()
516 .drop_table(database_id, schema_id, table_id);
517 Ok(())
518 }
519
520 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
521 if cascade {
522 return Err(ErrorCode::NotSupported(
523 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
524 "use drop instead".to_owned(),
525 )
526 .into());
527 }
528 let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
529 self.catalog
530 .write()
531 .drop_source(database_id, schema_id, source_id);
532 Ok(())
533 }
534
535 async fn reset_source(&self, _source_id: SourceId) -> Result<()> {
536 Ok(())
537 }
538
539 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
540 if cascade {
541 return Err(ErrorCode::NotSupported(
542 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
543 "use drop instead".to_owned(),
544 )
545 .into());
546 }
547 let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
548 self.catalog
549 .write()
550 .drop_sink(database_id, schema_id, sink_id);
551 Ok(())
552 }
553
554 async fn drop_subscription(
555 &self,
556 subscription_id: SubscriptionId,
557 cascade: bool,
558 ) -> Result<()> {
559 if cascade {
560 return Err(ErrorCode::NotSupported(
561 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
562 "use drop instead".to_owned(),
563 )
564 .into());
565 }
566 let (database_id, schema_id) =
567 self.drop_table_or_subscription_id(subscription_id.as_raw_id());
568 self.catalog
569 .write()
570 .drop_subscription(database_id, schema_id, subscription_id);
571 Ok(())
572 }
573
574 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
575 if cascade {
576 return Err(ErrorCode::NotSupported(
577 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
578 "use drop instead".to_owned(),
579 )
580 .into());
581 }
582 let &schema_id = self
583 .table_id_to_schema_id
584 .read()
585 .get(&index_id.as_raw_id())
586 .unwrap();
587 let database_id = self.get_database_id_by_schema(schema_id);
588
589 let index = {
590 let catalog_reader = self.catalog.read();
591 let schema_catalog = catalog_reader
592 .get_schema_by_id(database_id, schema_id)
593 .unwrap();
594 schema_catalog.get_index_by_id(index_id).unwrap().clone()
595 };
596
597 let index_table_id = index.index_table().id;
598 let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
599 self.catalog
600 .write()
601 .drop_index(database_id, schema_id, index_id);
602 self.catalog
603 .write()
604 .drop_table(database_id, schema_id, index_table_id);
605 Ok(())
606 }
607
608 async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
609 unreachable!()
610 }
611
612 async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
613 unreachable!()
614 }
615
616 async fn drop_secret(&self, _secret_id: SecretId, _cascade: bool) -> Result<()> {
617 unreachable!()
618 }
619
620 async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
621 self.catalog.write().drop_database(database_id);
622 Ok(())
623 }
624
625 async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
626 let database_id = self.drop_schema_id(schema_id);
627 self.catalog.write().drop_schema(database_id, schema_id);
628 Ok(())
629 }
630
631 async fn alter_name(
632 &self,
633 object_id: alter_name_request::Object,
634 object_name: &str,
635 ) -> Result<()> {
636 match object_id {
637 alter_name_request::Object::TableId(table_id) => {
638 self.catalog
639 .write()
640 .alter_table_name_by_id(table_id, object_name);
641 Ok(())
642 }
643 _ => {
644 unimplemented!()
645 }
646 }
647 }
648
649 async fn alter_source(&self, source: PbSource) -> Result<()> {
650 self.catalog.write().update_source(&source);
651 Ok(())
652 }
653
654 async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<()> {
655 for database in self.catalog.read().iter_databases() {
656 for schema in database.iter_schemas() {
657 match object {
658 Object::TableId(table_id) => {
659 if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
660 {
661 let mut pb_table = table.to_prost();
662 pb_table.owner = owner_id;
663 self.catalog.write().update_table(&pb_table);
664 return Ok(());
665 }
666 }
667 _ => unreachable!(),
668 }
669 }
670 }
671
672 Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
673 }
674
675 async fn alter_subscription_retention(
676 &self,
677 subscription_id: SubscriptionId,
678 retention_seconds: u64,
679 definition: String,
680 ) -> Result<()> {
681 for database in self.catalog.read().iter_databases() {
682 for schema in database.iter_schemas() {
683 if let Some(subscription) = schema.get_subscription_by_id(subscription_id) {
684 let mut pb_subscription = subscription.to_proto();
685 pb_subscription.retention_seconds = retention_seconds;
686 pb_subscription.definition = definition;
687 self.catalog.write().update_subscription(&pb_subscription);
688 return Ok(());
689 }
690 }
691 }
692
693 Err(
694 ErrorCode::ItemNotFound(format!("subscription not found: {:?}", subscription_id))
695 .into(),
696 )
697 }
698
699 async fn alter_set_schema(
700 &self,
701 object: alter_set_schema_request::Object,
702 new_schema_id: SchemaId,
703 ) -> Result<()> {
704 match object {
705 alter_set_schema_request::Object::TableId(table_id) => {
706 let mut pb_table = {
707 let reader = self.catalog.read();
708 let table = reader.get_any_table_by_id(table_id)?.to_owned();
709 table.to_prost()
710 };
711 pb_table.schema_id = new_schema_id;
712 self.catalog.write().update_table(&pb_table);
713 self.table_id_to_schema_id
714 .write()
715 .insert(table_id.as_raw_id(), new_schema_id);
716 Ok(())
717 }
718 _ => unreachable!(),
719 }
720 }
721
722 async fn alter_parallelism(
723 &self,
724 _job_id: JobId,
725 _parallelism: PbTableParallelism,
726 _deferred: bool,
727 ) -> Result<()> {
728 todo!()
729 }
730
731 async fn alter_backfill_parallelism(
732 &self,
733 _job_id: JobId,
734 _parallelism: Option<PbTableParallelism>,
735 _deferred: bool,
736 ) -> Result<()> {
737 todo!()
738 }
739
740 async fn alter_config(
741 &self,
742 _job_id: JobId,
743 _entries_to_add: HashMap<String, String>,
744 _keys_to_remove: Vec<String>,
745 ) -> Result<()> {
746 todo!()
747 }
748
749 async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
750 todo!()
751 }
752
753 async fn alter_secret(
754 &self,
755 _secret_id: SecretId,
756 _secret_name: String,
757 _database_id: DatabaseId,
758 _schema_id: SchemaId,
759 _owner_id: UserId,
760 _payload: Vec<u8>,
761 ) -> Result<()> {
762 unreachable!()
763 }
764
765 async fn alter_resource_group(
766 &self,
767 _table_id: TableId,
768 _resource_group: Option<String>,
769 _deferred: bool,
770 ) -> Result<()> {
771 todo!()
772 }
773
774 async fn alter_database_param(
775 &self,
776 database_id: DatabaseId,
777 param: AlterDatabaseParam,
778 ) -> Result<()> {
779 let mut pb_database = {
780 let reader = self.catalog.read();
781 let database = reader.get_database_by_id(database_id)?.to_owned();
782 database.to_prost()
783 };
784 match param {
785 AlterDatabaseParam::BarrierIntervalMs(interval) => {
786 pb_database.barrier_interval_ms = interval;
787 }
788 AlterDatabaseParam::CheckpointFrequency(frequency) => {
789 pb_database.checkpoint_frequency = frequency;
790 }
791 }
792 self.catalog.write().update_database(&pb_database);
793 Ok(())
794 }
795
796 async fn create_iceberg_table(
797 &self,
798 _table_job_info: PbTableJobInfo,
799 _sink_job_info: PbSinkJobInfo,
800 _iceberg_source: PbSource,
801 _if_not_exists: bool,
802 ) -> Result<()> {
803 todo!()
804 }
805
806 async fn wait(&self) -> Result<()> {
807 Ok(())
808 }
809}
810
811impl MockCatalogWriter {
812 pub fn new(
813 catalog: Arc<RwLock<Catalog>>,
814 hummock_snapshot_manager: HummockSnapshotManagerRef,
815 ) -> Self {
816 catalog.write().create_database(&PbDatabase {
817 id: 0.into(),
818 name: DEFAULT_DATABASE_NAME.to_owned(),
819 owner: DEFAULT_SUPER_USER_ID,
820 resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
821 barrier_interval_ms: None,
822 checkpoint_frequency: None,
823 });
824 catalog.write().create_schema(&PbSchema {
825 id: 1.into(),
826 name: DEFAULT_SCHEMA_NAME.to_owned(),
827 database_id: 0.into(),
828 owner: DEFAULT_SUPER_USER_ID,
829 });
830 catalog.write().create_schema(&PbSchema {
831 id: 2.into(),
832 name: PG_CATALOG_SCHEMA_NAME.to_owned(),
833 database_id: 0.into(),
834 owner: DEFAULT_SUPER_USER_ID,
835 });
836 catalog.write().create_schema(&PbSchema {
837 id: 3.into(),
838 name: RW_CATALOG_SCHEMA_NAME.to_owned(),
839 database_id: 0.into(),
840 owner: DEFAULT_SUPER_USER_ID,
841 });
842 let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
843 map.insert(1_u32.into(), 0_u32.into());
844 map.insert(2_u32.into(), 0_u32.into());
845 map.insert(3_u32.into(), 0_u32.into());
846 Self {
847 catalog,
848 id: AtomicU32::new(3),
849 table_id_to_schema_id: Default::default(),
850 schema_id_to_database_id: RwLock::new(map),
851 hummock_snapshot_manager,
852 }
853 }
854
855 fn gen_id<T: From<u32>>(&self) -> T {
856 (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
858 }
859
860 fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
861 self.table_id_to_schema_id
862 .write()
863 .insert(table_id, schema_id);
864 }
865
866 fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
867 let schema_id = self
868 .table_id_to_schema_id
869 .write()
870 .remove(&table_id)
871 .unwrap();
872 (self.get_database_id_by_schema(schema_id), schema_id)
873 }
874
875 fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
876 self.table_id_to_schema_id
877 .write()
878 .insert(table_id, schema_id);
879 }
880
881 fn add_table_or_subscription_id(
882 &self,
883 table_id: u32,
884 schema_id: SchemaId,
885 _database_id: DatabaseId,
886 ) {
887 self.table_id_to_schema_id
888 .write()
889 .insert(table_id, schema_id);
890 }
891
892 fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
893 self.table_id_to_schema_id
894 .write()
895 .insert(table_id, schema_id);
896 }
897
898 fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
899 let schema_id = self
900 .table_id_to_schema_id
901 .write()
902 .remove(&table_id)
903 .unwrap();
904 (self.get_database_id_by_schema(schema_id), schema_id)
905 }
906
907 fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
908 let schema_id = self
909 .table_id_to_schema_id
910 .write()
911 .remove(&table_id)
912 .unwrap();
913 (self.get_database_id_by_schema(schema_id), schema_id)
914 }
915
916 fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
917 let schema_id = self
918 .table_id_to_schema_id
919 .write()
920 .remove(&table_id)
921 .unwrap();
922 (self.get_database_id_by_schema(schema_id), schema_id)
923 }
924
925 fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
926 self.schema_id_to_database_id
927 .write()
928 .insert(schema_id, database_id);
929 }
930
931 fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
932 self.schema_id_to_database_id
933 .write()
934 .remove(&schema_id)
935 .unwrap()
936 }
937
938 fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
939 source.id = self.gen_id();
940 self.catalog.write().create_source(&source);
941 self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
942 Ok(source.id)
943 }
944
945 fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<SinkId> {
946 sink.id = self.gen_id();
947 sink.stream_job_status = PbStreamJobStatus::Created as _;
948 self.catalog.write().create_sink(&sink);
949 self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
950 Ok(sink.id)
951 }
952
953 fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
954 subscription.id = self.gen_id();
955 self.catalog.write().create_subscription(&subscription);
956 self.add_table_or_subscription_id(
957 subscription.id.as_raw_id(),
958 subscription.schema_id,
959 subscription.database_id,
960 );
961 Ok(())
962 }
963
964 fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
965 *self
966 .schema_id_to_database_id
967 .read()
968 .get(&schema_id)
969 .unwrap()
970 }
971
972 fn get_object_type(&self, object_id: ObjectId) -> PbObjectType {
973 let catalog = self.catalog.read();
974 for database in catalog.iter_databases() {
975 for schema in database.iter_schemas() {
976 if let Some(table) = schema.get_created_table_by_id(object_id.as_table_id()) {
977 return if table.is_mview() {
978 PbObjectType::Mview
979 } else {
980 PbObjectType::Table
981 };
982 }
983 if schema.get_source_by_id(object_id.as_source_id()).is_some() {
984 return PbObjectType::Source;
985 }
986 if schema.get_view_by_id(object_id.as_view_id()).is_some() {
987 return PbObjectType::View;
988 }
989 if schema.get_index_by_id(object_id.as_index_id()).is_some() {
990 return PbObjectType::Index;
991 }
992 }
993 }
994 PbObjectType::Unspecified
995 }
996
997 fn insert_object_dependencies(&self, object_id: ObjectId, dependencies: HashSet<ObjectId>) {
998 if dependencies.is_empty() {
999 return;
1000 }
1001 let dependencies = dependencies
1002 .into_iter()
1003 .map(|referenced_object_id| PbObjectDependency {
1004 object_id,
1005 referenced_object_id,
1006 referenced_object_type: self.get_object_type(referenced_object_id) as i32,
1007 })
1008 .collect();
1009 self.catalog
1010 .write()
1011 .insert_object_dependencies(dependencies);
1012 }
1013}
1014
1015pub struct MockUserInfoWriter {
1016 id: AtomicU32,
1017 user_info: Arc<RwLock<UserInfoManager>>,
1018}
1019
1020#[async_trait::async_trait]
1021impl UserInfoWriter for MockUserInfoWriter {
1022 async fn create_user(&self, user: UserInfo) -> Result<()> {
1023 let mut user = user;
1024 user.id = self.gen_id().into();
1025 self.user_info.write().create_user(user);
1026 Ok(())
1027 }
1028
1029 async fn drop_user(&self, id: UserId) -> Result<()> {
1030 self.user_info.write().drop_user(id);
1031 Ok(())
1032 }
1033
1034 async fn update_user(
1035 &self,
1036 update_user: UserInfo,
1037 update_fields: Vec<UpdateField>,
1038 ) -> Result<()> {
1039 let mut lock = self.user_info.write();
1040 let id = update_user.get_id();
1041 let Some(old_name) = lock.get_user_name_by_id(id) else {
1042 return Ok(());
1043 };
1044 let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
1045 update_fields.into_iter().for_each(|field| match field {
1046 UpdateField::Super => user_info.is_super = update_user.is_super,
1047 UpdateField::Login => user_info.can_login = update_user.can_login,
1048 UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
1049 UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
1050 UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
1051 UpdateField::Rename => user_info.name.clone_from(&update_user.name),
1052 UpdateField::Admin => user_info.is_admin = update_user.is_admin,
1053 UpdateField::Unspecified => unreachable!(),
1054 });
1055 lock.update_user(update_user);
1056 Ok(())
1057 }
1058
1059 async fn grant_privilege(
1062 &self,
1063 users: Vec<UserId>,
1064 privileges: Vec<GrantPrivilege>,
1065 with_grant_option: bool,
1066 _grantor: UserId,
1067 ) -> Result<()> {
1068 let privileges = privileges
1069 .into_iter()
1070 .map(|mut p| {
1071 p.action_with_opts
1072 .iter_mut()
1073 .for_each(|ao| ao.with_grant_option = with_grant_option);
1074 p
1075 })
1076 .collect::<Vec<_>>();
1077 for user_id in users {
1078 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1079 u.extend_privileges(privileges.clone());
1080 }
1081 }
1082 Ok(())
1083 }
1084
1085 async fn revoke_privilege(
1088 &self,
1089 users: Vec<UserId>,
1090 privileges: Vec<GrantPrivilege>,
1091 _granted_by: UserId,
1092 _revoke_by: UserId,
1093 revoke_grant_option: bool,
1094 _cascade: bool,
1095 ) -> Result<()> {
1096 for user_id in users {
1097 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1098 u.revoke_privileges(privileges.clone(), revoke_grant_option);
1099 }
1100 }
1101 Ok(())
1102 }
1103
1104 async fn alter_default_privilege(
1105 &self,
1106 _users: Vec<UserId>,
1107 _database_id: DatabaseId,
1108 _schemas: Vec<SchemaId>,
1109 _operation: AlterDefaultPrivilegeOperation,
1110 _operated_by: UserId,
1111 ) -> Result<()> {
1112 todo!()
1113 }
1114}
1115
1116impl MockUserInfoWriter {
1117 pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1118 user_info.write().create_user(UserInfo {
1119 id: DEFAULT_SUPER_USER_ID,
1120 name: DEFAULT_SUPER_USER.to_owned(),
1121 is_super: true,
1122 can_create_db: true,
1123 can_create_user: true,
1124 can_login: true,
1125 ..Default::default()
1126 });
1127 user_info.write().create_user(UserInfo {
1128 id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1129 name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1130 is_super: true,
1131 can_create_db: true,
1132 can_create_user: true,
1133 can_login: true,
1134 is_admin: true,
1135 ..Default::default()
1136 });
1137 Self {
1138 user_info,
1139 id: AtomicU32::new(NON_RESERVED_USER_ID.as_raw_id()),
1140 }
1141 }
1142
1143 fn gen_id(&self) -> u32 {
1144 self.id.fetch_add(1, Ordering::SeqCst)
1145 }
1146}
1147
1148pub struct MockFrontendMetaClient {}
1149
1150#[async_trait::async_trait]
1151impl FrontendMetaClient for MockFrontendMetaClient {
1152 async fn try_unregister(&self) {}
1153
1154 async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1155 Ok(INVALID_VERSION_ID)
1156 }
1157
1158 async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1159 Ok(vec![])
1160 }
1161
1162 async fn list_table_fragments(
1163 &self,
1164 _table_ids: &[JobId],
1165 ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1166 Ok(HashMap::default())
1167 }
1168
1169 async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1170 Ok(vec![])
1171 }
1172
1173 async fn list_fragment_distribution(
1174 &self,
1175 _include_node: bool,
1176 ) -> RpcResult<Vec<FragmentDistribution>> {
1177 Ok(vec![])
1178 }
1179
1180 async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1181 Ok(vec![])
1182 }
1183
1184 async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1185 Ok(vec![])
1186 }
1187
1188 async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1189 Ok(vec![])
1190 }
1191
1192 async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1193 Ok(vec![])
1194 }
1195
1196 async fn list_sink_log_store_tables(
1197 &self,
1198 ) -> RpcResult<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1199 Ok(vec![])
1200 }
1201
1202 async fn set_system_param(
1203 &self,
1204 _param: String,
1205 _value: Option<String>,
1206 ) -> RpcResult<Option<SystemParamsReader>> {
1207 Ok(Some(SystemParams::default().into()))
1208 }
1209
1210 async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1211 Ok(Default::default())
1212 }
1213
1214 async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1215 Ok("".to_owned())
1216 }
1217
1218 async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1219 Ok(vec![])
1220 }
1221
1222 async fn get_tables(
1223 &self,
1224 _table_ids: Vec<crate::catalog::TableId>,
1225 _include_dropped_tables: bool,
1226 ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1227 Ok(HashMap::new())
1228 }
1229
1230 async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, HummockVersionId)>> {
1231 unimplemented!()
1232 }
1233
1234 async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1235 unimplemented!()
1236 }
1237
1238 async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1239 Ok(HummockVersion::default())
1240 }
1241
1242 async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1243 unimplemented!()
1244 }
1245
1246 async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1247 unimplemented!()
1248 }
1249
1250 async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1251 unimplemented!()
1252 }
1253
1254 async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1255 unimplemented!()
1256 }
1257
1258 async fn list_hummock_active_write_limits(
1259 &self,
1260 ) -> RpcResult<HashMap<CompactionGroupId, WriteLimit>> {
1261 unimplemented!()
1262 }
1263
1264 async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1265 unimplemented!()
1266 }
1267
1268 async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1269 Ok(vec![])
1270 }
1271
1272 async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1273 unimplemented!()
1274 }
1275
1276 async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1277 Ok(vec![])
1278 }
1279
1280 async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1281 unimplemented!()
1282 }
1283
1284 async fn recover(&self) -> RpcResult<()> {
1285 unimplemented!()
1286 }
1287
1288 async fn backup_meta(&self, _remarks: Option<String>) -> RpcResult<u64> {
1289 unimplemented!()
1290 }
1291
1292 async fn get_backup_job_status(
1293 &self,
1294 _job_id: u64,
1295 ) -> RpcResult<(risingwave_pb::backup_service::BackupJobStatus, String)> {
1296 unimplemented!()
1297 }
1298
1299 async fn delete_meta_snapshot(&self, _snapshot_ids: &[u64]) -> RpcResult<()> {
1300 unimplemented!()
1301 }
1302
1303 async fn apply_throttle(
1304 &self,
1305 _throttle_target: PbThrottleTarget,
1306 _throttle_type: risingwave_pb::common::PbThrottleType,
1307 _id: u32,
1308 _rate_limit: Option<u32>,
1309 ) -> RpcResult<()> {
1310 unimplemented!()
1311 }
1312
1313 async fn alter_fragment_parallelism(
1314 &self,
1315 _fragment_ids: Vec<FragmentId>,
1316 _parallelism: Option<PbTableParallelism>,
1317 ) -> RpcResult<()> {
1318 unimplemented!()
1319 }
1320
1321 async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1322 Ok(RecoveryStatus::StatusRunning)
1323 }
1324
1325 async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1326 Ok(vec![])
1327 }
1328
1329 async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1330 Ok(vec![])
1331 }
1332
1333 async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1334 Ok(HashMap::default())
1335 }
1336
1337 async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1338 unimplemented!()
1339 }
1340
1341 async fn alter_sink_props(
1342 &self,
1343 _sink_id: SinkId,
1344 _changed_props: BTreeMap<String, String>,
1345 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1346 _connector_conn_ref: Option<ConnectionId>,
1347 ) -> RpcResult<()> {
1348 unimplemented!()
1349 }
1350
1351 async fn alter_iceberg_table_props(
1352 &self,
1353 _table_id: TableId,
1354 _sink_id: SinkId,
1355 _source_id: SourceId,
1356 _changed_props: BTreeMap<String, String>,
1357 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1358 _connector_conn_ref: Option<ConnectionId>,
1359 ) -> RpcResult<()> {
1360 unimplemented!()
1361 }
1362
1363 async fn alter_source_connector_props(
1364 &self,
1365 _source_id: SourceId,
1366 _changed_props: BTreeMap<String, String>,
1367 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1368 _connector_conn_ref: Option<ConnectionId>,
1369 ) -> RpcResult<()> {
1370 unimplemented!()
1371 }
1372
1373 async fn alter_connection_connector_props(
1374 &self,
1375 _connection_id: u32,
1376 _changed_props: BTreeMap<String, String>,
1377 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1378 ) -> RpcResult<()> {
1379 Ok(())
1380 }
1381
1382 async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1383 unimplemented!()
1384 }
1385
1386 async fn list_iceberg_compaction_status(&self) -> RpcResult<Vec<IcebergCompactionStatus>> {
1387 Ok(vec![])
1388 }
1389
1390 async fn get_fragment_by_id(
1391 &self,
1392 _fragment_id: FragmentId,
1393 ) -> RpcResult<Option<FragmentDistribution>> {
1394 unimplemented!()
1395 }
1396
1397 async fn get_fragment_vnodes(
1398 &self,
1399 _fragment_id: FragmentId,
1400 ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1401 unimplemented!()
1402 }
1403
1404 async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1405 unimplemented!()
1406 }
1407
1408 fn worker_id(&self) -> WorkerId {
1409 0.into()
1410 }
1411
1412 async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1413 Ok(())
1414 }
1415
1416 async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1417 Ok(1)
1418 }
1419
1420 async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1421 Ok(())
1422 }
1423
1424 async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1425 Ok(RefreshResponse { status: None })
1426 }
1427
1428 fn cluster_id(&self) -> &str {
1429 "test-cluster-uuid"
1430 }
1431
1432 async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1433 unimplemented!()
1434 }
1435
1436 async fn get_hummock_table_change_log(
1437 &self,
1438 _start_epoch_inclusive: Option<u64>,
1439 _end_epoch_inclusive: Option<u64>,
1440 _table_ids: Option<HashSet<TableId>>,
1441 _exclude_empty: bool,
1442 _limit: Option<u32>,
1443 ) -> RpcResult<TableChangeLogs> {
1444 Ok(HashMap::default())
1445 }
1446}
1447
1448#[cfg(test)]
1449pub static PROTO_FILE_DATA: &str = r#"
1450 syntax = "proto3";
1451 package test;
1452 message TestRecord {
1453 int32 id = 1;
1454 Country country = 3;
1455 int64 zipcode = 4;
1456 float rate = 5;
1457 }
1458 message TestRecordAlterType {
1459 string id = 1;
1460 Country country = 3;
1461 int32 zipcode = 4;
1462 float rate = 5;
1463 }
1464 message TestRecordExt {
1465 int32 id = 1;
1466 Country country = 3;
1467 int64 zipcode = 4;
1468 float rate = 5;
1469 string name = 6;
1470 }
1471 message Country {
1472 string address = 1;
1473 City city = 2;
1474 string zipcode = 3;
1475 }
1476 message City {
1477 string address = 1;
1478 string zipcode = 2;
1479 }"#;
1480
1481pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1484 let in_file = Builder::new()
1485 .prefix("temp")
1486 .suffix(".proto")
1487 .rand_bytes(8)
1488 .tempfile()
1489 .unwrap();
1490
1491 let out_file = Builder::new()
1492 .prefix("temp")
1493 .suffix(".pb")
1494 .rand_bytes(8)
1495 .tempfile()
1496 .unwrap();
1497
1498 let mut file = in_file.as_file();
1499 file.write_all(proto_data.as_ref())
1500 .expect("writing binary to test file");
1501 file.flush().expect("flush temp file failed");
1502 let include_path = in_file
1503 .path()
1504 .parent()
1505 .unwrap()
1506 .to_string_lossy()
1507 .into_owned();
1508 let out_path = out_file.path().to_string_lossy().into_owned();
1509 let in_path = in_file.path().to_string_lossy().into_owned();
1510 let mut compile = std::process::Command::new("protoc");
1511
1512 let out = compile
1513 .arg("--include_imports")
1514 .arg("-I")
1515 .arg(include_path)
1516 .arg(format!("--descriptor_set_out={}", out_path))
1517 .arg(in_path)
1518 .output()
1519 .expect("failed to compile proto");
1520 if !out.status.success() {
1521 panic!("compile proto failed \n output: {:?}", out);
1522 }
1523 out_file
1524}