1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28 AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29 DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30 FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31 RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::AdaptiveParallelismStrategy;
37use risingwave_common::system_param::reader::SystemParamsReader;
38use risingwave_common::util::cluster_limit::ClusterLimit;
39use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
40use risingwave_hummock_sdk::change_log::TableChangeLogs;
41use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
42use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
43use risingwave_pb::backup_service::MetaSnapshotMetadata;
44use risingwave_pb::catalog::{
45 PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
46 PbSubscription, PbTable, PbView, Table,
47};
48use risingwave_pb::common::{PbObjectType, WorkerNode};
49use risingwave_pb::ddl_service::alter_owner_request::Object;
50use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
51use risingwave_pb::ddl_service::{
52 DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
53 alter_swap_rename_request, create_connection_request, streaming_job_resource_type,
54};
55use risingwave_pb::hummock::write_limits::WriteLimit;
56use risingwave_pb::hummock::{
57 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
58};
59use risingwave_pb::id::ActorId;
60use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
61use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
62use risingwave_pb::meta::list_actor_states_response::ActorState;
63use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
64use risingwave_pb::meta::list_iceberg_compaction_status_response::IcebergCompactionStatus;
65use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
66use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
67use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
68use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
69use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
70use risingwave_pb::meta::{
71 EventLog, FragmentDistribution, ObjectDependency as PbObjectDependency, PbTableParallelism,
72 PbThrottleTarget, RecoveryStatus, RefreshRequest, RefreshResponse, SystemParams,
73 list_sink_log_store_tables_response,
74};
75use risingwave_pb::secret::PbSecretRef;
76use risingwave_pb::stream_plan::StreamFragmentGraph;
77use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
78use risingwave_pb::user::update_user_request::UpdateField;
79use risingwave_pb::user::{GrantPrivilege, UserInfo};
80use risingwave_rpc_client::error::Result as RpcResult;
81use tempfile::{Builder, NamedTempFile};
82
83use crate::FrontendOpts;
84use crate::catalog::catalog_service::CatalogWriter;
85use crate::catalog::root_catalog::Catalog;
86use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
87use crate::error::{ErrorCode, Result, RwError};
88use crate::handler::RwPgResponse;
89use crate::meta_client::FrontendMetaClient;
90use crate::scheduler::HummockSnapshotManagerRef;
91use crate::session::{AuthContext, FrontendEnv, SessionImpl};
92use crate::user::UserId;
93use crate::user::user_manager::UserInfoManager;
94use crate::user::user_service::UserInfoWriter;
95
96pub struct LocalFrontend {
98 pub opts: FrontendOpts,
99 env: FrontendEnv,
100}
101
102impl SessionManager for LocalFrontend {
103 type Error = RwError;
104 type Session = SessionImpl;
105
106 fn create_dummy_session(
107 &self,
108 _database_id: DatabaseId,
109 ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
110 unreachable!()
111 }
112
113 fn connect(
114 &self,
115 _database: &str,
116 _user_name: &str,
117 _peer_addr: AddressRef,
118 ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
119 Ok(self.session_ref())
120 }
121
122 fn cancel_queries_in_session(&self, _session_id: SessionId) {
123 unreachable!()
124 }
125
126 fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
127 unreachable!()
128 }
129
130 fn end_session(&self, _session: &Self::Session) {
131 unreachable!()
132 }
133}
134
135impl LocalFrontend {
136 #[expect(clippy::unused_async)]
137 pub async fn new(opts: FrontendOpts) -> Self {
138 let env = FrontendEnv::mock();
139 Self { opts, env }
140 }
141
142 pub async fn run_sql(
143 &self,
144 sql: impl Into<String>,
145 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
146 let sql: Arc<str> = Arc::from(sql.into());
147 Box::pin(self.session_ref().run_statement(sql, vec![])).await
148 }
149
150 pub async fn run_sql_with_session(
151 &self,
152 session_ref: Arc<SessionImpl>,
153 sql: impl Into<String>,
154 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
155 let sql: Arc<str> = Arc::from(sql.into());
156 Box::pin(session_ref.run_statement(sql, vec![])).await
157 }
158
159 pub async fn run_user_sql(
160 &self,
161 sql: impl Into<String>,
162 database: String,
163 user_name: String,
164 user_id: UserId,
165 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
166 let sql: Arc<str> = Arc::from(sql.into());
167 Box::pin(
168 self.session_user_ref(database, user_name, user_id)
169 .run_statement(sql, vec![]),
170 )
171 .await
172 }
173
174 pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
175 let mut rsp = self.run_sql(sql).await.unwrap();
176 let mut res = vec![];
177 #[for_await]
178 for row_set in rsp.values_stream() {
179 for row in row_set.unwrap() {
180 res.push(format!("{:?}", row));
181 }
182 }
183 res
184 }
185
186 pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
187 let mut rsp = self.run_sql(sql).await.unwrap();
188 assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
189 let mut res = String::new();
190 #[for_await]
191 for row_set in rsp.values_stream() {
192 for row in row_set.unwrap() {
193 let row: Row = row;
194 let row = row.values()[0].as_ref().unwrap();
195 res += std::str::from_utf8(row).unwrap();
196 res += "\n";
197 }
198 }
199 res
200 }
201
202 pub fn session_ref(&self) -> Arc<SessionImpl> {
204 self.session_user_ref(
205 DEFAULT_DATABASE_NAME.to_owned(),
206 DEFAULT_SUPER_USER.to_owned(),
207 DEFAULT_SUPER_USER_ID,
208 )
209 }
210
211 pub fn session_user_ref(
212 &self,
213 database: String,
214 user_name: String,
215 user_id: UserId,
216 ) -> Arc<SessionImpl> {
217 Arc::new(SessionImpl::new(
218 self.env.clone(),
219 AuthContext::new(database, user_name, user_id),
220 UserAuthenticator::None,
221 (0, 0),
223 Address::Tcp(SocketAddr::new(
224 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
225 6666,
226 ))
227 .into(),
228 Default::default(),
229 ))
230 }
231}
232
233pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
234 if rsp.stmt_type() != StatementType::EXPLAIN {
235 panic!("RESPONSE INVALID: {rsp:?}");
236 }
237 let mut res = String::new();
238 #[for_await]
239 for row_set in rsp.values_stream() {
240 for row in row_set.unwrap() {
241 let row: Row = row;
242 let row = row.values()[0].as_ref().unwrap();
243 res += std::str::from_utf8(row).unwrap();
244 res += "\n";
245 }
246 }
247 res
248}
249
250pub struct MockCatalogWriter {
251 catalog: Arc<RwLock<Catalog>>,
252 id: AtomicU32,
253 table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
254 schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
255 hummock_snapshot_manager: HummockSnapshotManagerRef,
256}
257
258#[async_trait::async_trait]
259impl CatalogWriter for MockCatalogWriter {
260 async fn create_database(
261 &self,
262 db_name: &str,
263 owner: UserId,
264 resource_group: &str,
265 barrier_interval_ms: Option<u32>,
266 checkpoint_frequency: Option<u64>,
267 ) -> Result<()> {
268 let database_id = DatabaseId::new(self.gen_id());
269 self.catalog.write().create_database(&PbDatabase {
270 name: db_name.to_owned(),
271 id: database_id,
272 owner,
273 resource_group: resource_group.to_owned(),
274 barrier_interval_ms,
275 checkpoint_frequency,
276 });
277 self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
278 .await?;
279 self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
280 .await?;
281 self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
282 .await?;
283 Ok(())
284 }
285
286 async fn create_schema(
287 &self,
288 db_id: DatabaseId,
289 schema_name: &str,
290 owner: UserId,
291 ) -> Result<()> {
292 let id = self.gen_id();
293 self.catalog.write().create_schema(&PbSchema {
294 id,
295 name: schema_name.to_owned(),
296 database_id: db_id,
297 owner,
298 });
299 self.add_schema_id(id, db_id);
300 Ok(())
301 }
302
303 async fn create_materialized_view(
304 &self,
305 mut table: PbTable,
306 _graph: StreamFragmentGraph,
307 dependencies: HashSet<ObjectId>,
308 _resource_type: streaming_job_resource_type::ResourceType,
309 _if_not_exists: bool,
310 ) -> Result<()> {
311 table.id = self.gen_id();
312 table.stream_job_status = PbStreamJobStatus::Created as _;
313 table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
314 self.catalog.write().create_table(&table);
315 self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
316 self.insert_object_dependencies(table.id.as_object_id(), dependencies);
317 self.hummock_snapshot_manager.add_table_for_test(table.id);
318 Ok(())
319 }
320
321 async fn replace_materialized_view(
322 &self,
323 mut table: PbTable,
324 _graph: StreamFragmentGraph,
325 ) -> Result<()> {
326 table.stream_job_status = PbStreamJobStatus::Created as _;
327 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
328 self.catalog.write().update_table(&table);
329 Ok(())
330 }
331
332 async fn create_view(&self, mut view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
333 view.id = self.gen_id();
334 self.catalog.write().create_view(&view);
335 self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
336 self.insert_object_dependencies(view.id.as_object_id(), dependencies);
337 Ok(())
338 }
339
340 async fn create_table(
341 &self,
342 source: Option<PbSource>,
343 mut table: PbTable,
344 graph: StreamFragmentGraph,
345 _job_type: PbTableJobType,
346 if_not_exists: bool,
347 dependencies: HashSet<ObjectId>,
348 ) -> Result<()> {
349 if let Some(source) = source {
350 let source_id = self.create_source_inner(source)?;
351 table.optional_associated_source_id = Some(source_id.into());
352 }
353 self.create_materialized_view(
354 table,
355 graph,
356 dependencies,
357 streaming_job_resource_type::ResourceType::Regular(true),
358 if_not_exists,
359 )
360 .await?;
361 Ok(())
362 }
363
364 async fn replace_table(
365 &self,
366 _source: Option<PbSource>,
367 mut table: PbTable,
368 _graph: StreamFragmentGraph,
369 _job_type: TableJobType,
370 ) -> Result<()> {
371 table.stream_job_status = PbStreamJobStatus::Created as _;
372 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
373 self.catalog.write().update_table(&table);
374 Ok(())
375 }
376
377 async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
378 self.catalog.write().update_source(&source);
379 Ok(())
380 }
381
382 async fn create_source(
383 &self,
384 source: PbSource,
385 _graph: Option<StreamFragmentGraph>,
386 _if_not_exists: bool,
387 ) -> Result<()> {
388 self.create_source_inner(source).map(|_| ())
389 }
390
391 async fn create_sink(
392 &self,
393 sink: PbSink,
394 graph: StreamFragmentGraph,
395 dependencies: HashSet<ObjectId>,
396 _if_not_exists: bool,
397 ) -> Result<()> {
398 let sink_id = self.create_sink_inner(sink, graph)?;
399 self.insert_object_dependencies(sink_id.as_object_id(), dependencies);
400 Ok(())
401 }
402
403 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
404 self.create_subscription_inner(subscription)
405 }
406
407 async fn create_index(
408 &self,
409 mut index: PbIndex,
410 mut index_table: PbTable,
411 _graph: StreamFragmentGraph,
412 _if_not_exists: bool,
413 ) -> Result<()> {
414 index_table.id = self.gen_id();
415 index_table.stream_job_status = PbStreamJobStatus::Created as _;
416 index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
417 self.catalog.write().create_table(&index_table);
418 self.add_table_or_index_id(
419 index_table.id.as_raw_id(),
420 index_table.schema_id,
421 index_table.database_id,
422 );
423
424 index.id = index_table.id.as_raw_id().into();
425 index.index_table_id = index_table.id;
426 self.catalog.write().create_index(&index);
427 Ok(())
428 }
429
430 async fn create_function(&self, _function: PbFunction) -> Result<()> {
431 unreachable!()
432 }
433
434 async fn create_connection(
435 &self,
436 _connection_name: String,
437 _database_id: DatabaseId,
438 _schema_id: SchemaId,
439 _owner_id: UserId,
440 _connection: create_connection_request::Payload,
441 ) -> Result<()> {
442 unreachable!()
443 }
444
445 async fn create_secret(
446 &self,
447 _secret_name: String,
448 _database_id: DatabaseId,
449 _schema_id: SchemaId,
450 _owner_id: UserId,
451 _payload: Vec<u8>,
452 ) -> Result<()> {
453 unreachable!()
454 }
455
456 async fn comment_on(&self, _comment: PbComment) -> Result<()> {
457 unreachable!()
458 }
459
460 async fn drop_table(
461 &self,
462 source_id: Option<SourceId>,
463 table_id: TableId,
464 cascade: bool,
465 ) -> Result<()> {
466 if cascade {
467 return Err(ErrorCode::NotSupported(
468 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
469 "use drop instead".to_owned(),
470 )
471 .into());
472 }
473 if let Some(source_id) = source_id {
474 self.drop_table_or_source_id(source_id.as_raw_id());
475 }
476 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
477 let indexes =
478 self.catalog
479 .read()
480 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
481 for index in indexes {
482 self.drop_index(index.id, cascade).await?;
483 }
484 self.catalog
485 .write()
486 .drop_table(database_id, schema_id, table_id);
487 if let Some(source_id) = source_id {
488 self.catalog
489 .write()
490 .drop_source(database_id, schema_id, source_id);
491 }
492 Ok(())
493 }
494
495 async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
496 unreachable!()
497 }
498
499 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
500 if cascade {
501 return Err(ErrorCode::NotSupported(
502 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
503 "use drop instead".to_owned(),
504 )
505 .into());
506 }
507 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
508 let indexes =
509 self.catalog
510 .read()
511 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
512 for index in indexes {
513 self.drop_index(index.id, cascade).await?;
514 }
515 self.catalog
516 .write()
517 .drop_table(database_id, schema_id, table_id);
518 Ok(())
519 }
520
521 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
522 if cascade {
523 return Err(ErrorCode::NotSupported(
524 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
525 "use drop instead".to_owned(),
526 )
527 .into());
528 }
529 let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
530 self.catalog
531 .write()
532 .drop_source(database_id, schema_id, source_id);
533 Ok(())
534 }
535
536 async fn reset_source(&self, _source_id: SourceId) -> Result<()> {
537 Ok(())
538 }
539
540 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
541 if cascade {
542 return Err(ErrorCode::NotSupported(
543 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
544 "use drop instead".to_owned(),
545 )
546 .into());
547 }
548 let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
549 self.catalog
550 .write()
551 .drop_sink(database_id, schema_id, sink_id);
552 Ok(())
553 }
554
555 async fn drop_subscription(
556 &self,
557 subscription_id: SubscriptionId,
558 cascade: bool,
559 ) -> Result<()> {
560 if cascade {
561 return Err(ErrorCode::NotSupported(
562 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
563 "use drop instead".to_owned(),
564 )
565 .into());
566 }
567 let (database_id, schema_id) =
568 self.drop_table_or_subscription_id(subscription_id.as_raw_id());
569 self.catalog
570 .write()
571 .drop_subscription(database_id, schema_id, subscription_id);
572 Ok(())
573 }
574
575 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
576 if cascade {
577 return Err(ErrorCode::NotSupported(
578 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
579 "use drop instead".to_owned(),
580 )
581 .into());
582 }
583 let &schema_id = self
584 .table_id_to_schema_id
585 .read()
586 .get(&index_id.as_raw_id())
587 .unwrap();
588 let database_id = self.get_database_id_by_schema(schema_id);
589
590 let index = {
591 let catalog_reader = self.catalog.read();
592 let schema_catalog = catalog_reader
593 .get_schema_by_id(database_id, schema_id)
594 .unwrap();
595 schema_catalog.get_index_by_id(index_id).unwrap().clone()
596 };
597
598 let index_table_id = index.index_table().id;
599 let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
600 self.catalog
601 .write()
602 .drop_index(database_id, schema_id, index_id);
603 self.catalog
604 .write()
605 .drop_table(database_id, schema_id, index_table_id);
606 Ok(())
607 }
608
609 async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
610 unreachable!()
611 }
612
613 async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
614 unreachable!()
615 }
616
617 async fn drop_secret(&self, _secret_id: SecretId, _cascade: bool) -> Result<()> {
618 unreachable!()
619 }
620
621 async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
622 self.catalog.write().drop_database(database_id);
623 Ok(())
624 }
625
626 async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
627 let database_id = self.drop_schema_id(schema_id);
628 self.catalog.write().drop_schema(database_id, schema_id);
629 Ok(())
630 }
631
632 async fn alter_name(
633 &self,
634 object_id: alter_name_request::Object,
635 object_name: &str,
636 ) -> Result<()> {
637 match object_id {
638 alter_name_request::Object::TableId(table_id) => {
639 self.catalog
640 .write()
641 .alter_table_name_by_id(table_id, object_name);
642 Ok(())
643 }
644 _ => {
645 unimplemented!()
646 }
647 }
648 }
649
650 async fn alter_source(&self, source: PbSource) -> Result<()> {
651 self.catalog.write().update_source(&source);
652 Ok(())
653 }
654
655 async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<()> {
656 for database in self.catalog.read().iter_databases() {
657 for schema in database.iter_schemas() {
658 match object {
659 Object::TableId(table_id) => {
660 if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
661 {
662 let mut pb_table = table.to_prost();
663 pb_table.owner = owner_id;
664 self.catalog.write().update_table(&pb_table);
665 return Ok(());
666 }
667 }
668 _ => unreachable!(),
669 }
670 }
671 }
672
673 Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
674 }
675
676 async fn alter_subscription_retention(
677 &self,
678 subscription_id: SubscriptionId,
679 retention_seconds: u64,
680 definition: String,
681 ) -> Result<()> {
682 for database in self.catalog.read().iter_databases() {
683 for schema in database.iter_schemas() {
684 if let Some(subscription) = schema.get_subscription_by_id(subscription_id) {
685 let mut pb_subscription = subscription.to_proto();
686 pb_subscription.retention_seconds = retention_seconds;
687 pb_subscription.definition = definition;
688 self.catalog.write().update_subscription(&pb_subscription);
689 return Ok(());
690 }
691 }
692 }
693
694 Err(
695 ErrorCode::ItemNotFound(format!("subscription not found: {:?}", subscription_id))
696 .into(),
697 )
698 }
699
700 async fn alter_set_schema(
701 &self,
702 object: alter_set_schema_request::Object,
703 new_schema_id: SchemaId,
704 ) -> Result<()> {
705 match object {
706 alter_set_schema_request::Object::TableId(table_id) => {
707 let mut pb_table = {
708 let reader = self.catalog.read();
709 let table = reader.get_any_table_by_id(table_id)?.to_owned();
710 table.to_prost()
711 };
712 pb_table.schema_id = new_schema_id;
713 self.catalog.write().update_table(&pb_table);
714 self.table_id_to_schema_id
715 .write()
716 .insert(table_id.as_raw_id(), new_schema_id);
717 Ok(())
718 }
719 _ => unreachable!(),
720 }
721 }
722
723 async fn alter_parallelism(
724 &self,
725 _job_id: JobId,
726 _parallelism: PbTableParallelism,
727 _adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
728 _deferred: bool,
729 ) -> Result<()> {
730 todo!()
731 }
732
733 async fn alter_backfill_parallelism(
734 &self,
735 _job_id: JobId,
736 _parallelism: Option<PbTableParallelism>,
737 _adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
738 _deferred: bool,
739 ) -> Result<()> {
740 todo!()
741 }
742
743 async fn alter_config(
744 &self,
745 _job_id: JobId,
746 _entries_to_add: HashMap<String, String>,
747 _keys_to_remove: Vec<String>,
748 ) -> Result<()> {
749 todo!()
750 }
751
752 async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
753 todo!()
754 }
755
756 async fn alter_secret(
757 &self,
758 _secret_id: SecretId,
759 _secret_name: String,
760 _database_id: DatabaseId,
761 _schema_id: SchemaId,
762 _owner_id: UserId,
763 _payload: Vec<u8>,
764 ) -> Result<()> {
765 unreachable!()
766 }
767
768 async fn alter_resource_group(
769 &self,
770 _table_id: TableId,
771 _resource_group: Option<String>,
772 _deferred: bool,
773 ) -> Result<()> {
774 todo!()
775 }
776
777 async fn alter_database_param(
778 &self,
779 database_id: DatabaseId,
780 param: AlterDatabaseParam,
781 ) -> Result<()> {
782 let mut pb_database = {
783 let reader = self.catalog.read();
784 let database = reader.get_database_by_id(database_id)?.to_owned();
785 database.to_prost()
786 };
787 match param {
788 AlterDatabaseParam::BarrierIntervalMs(interval) => {
789 pb_database.barrier_interval_ms = interval;
790 }
791 AlterDatabaseParam::CheckpointFrequency(frequency) => {
792 pb_database.checkpoint_frequency = frequency;
793 }
794 }
795 self.catalog.write().update_database(&pb_database);
796 Ok(())
797 }
798
799 async fn create_iceberg_table(
800 &self,
801 _table_job_info: PbTableJobInfo,
802 _sink_job_info: PbSinkJobInfo,
803 _iceberg_source: PbSource,
804 _if_not_exists: bool,
805 ) -> Result<()> {
806 todo!()
807 }
808
809 async fn wait(&self, _job_id: Option<JobId>) -> Result<()> {
810 Ok(())
811 }
812}
813
814impl MockCatalogWriter {
815 pub fn new(
816 catalog: Arc<RwLock<Catalog>>,
817 hummock_snapshot_manager: HummockSnapshotManagerRef,
818 ) -> Self {
819 catalog.write().create_database(&PbDatabase {
820 id: 0.into(),
821 name: DEFAULT_DATABASE_NAME.to_owned(),
822 owner: DEFAULT_SUPER_USER_ID,
823 resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
824 barrier_interval_ms: None,
825 checkpoint_frequency: None,
826 });
827 catalog.write().create_schema(&PbSchema {
828 id: 1.into(),
829 name: DEFAULT_SCHEMA_NAME.to_owned(),
830 database_id: 0.into(),
831 owner: DEFAULT_SUPER_USER_ID,
832 });
833 catalog.write().create_schema(&PbSchema {
834 id: 2.into(),
835 name: PG_CATALOG_SCHEMA_NAME.to_owned(),
836 database_id: 0.into(),
837 owner: DEFAULT_SUPER_USER_ID,
838 });
839 catalog.write().create_schema(&PbSchema {
840 id: 3.into(),
841 name: RW_CATALOG_SCHEMA_NAME.to_owned(),
842 database_id: 0.into(),
843 owner: DEFAULT_SUPER_USER_ID,
844 });
845 let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
846 map.insert(1_u32.into(), 0_u32.into());
847 map.insert(2_u32.into(), 0_u32.into());
848 map.insert(3_u32.into(), 0_u32.into());
849 Self {
850 catalog,
851 id: AtomicU32::new(3),
852 table_id_to_schema_id: Default::default(),
853 schema_id_to_database_id: RwLock::new(map),
854 hummock_snapshot_manager,
855 }
856 }
857
858 fn gen_id<T: From<u32>>(&self) -> T {
859 (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
861 }
862
863 fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
864 self.table_id_to_schema_id
865 .write()
866 .insert(table_id, schema_id);
867 }
868
869 fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
870 let schema_id = self
871 .table_id_to_schema_id
872 .write()
873 .remove(&table_id)
874 .unwrap();
875 (self.get_database_id_by_schema(schema_id), schema_id)
876 }
877
878 fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
879 self.table_id_to_schema_id
880 .write()
881 .insert(table_id, schema_id);
882 }
883
884 fn add_table_or_subscription_id(
885 &self,
886 table_id: u32,
887 schema_id: SchemaId,
888 _database_id: DatabaseId,
889 ) {
890 self.table_id_to_schema_id
891 .write()
892 .insert(table_id, schema_id);
893 }
894
895 fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
896 self.table_id_to_schema_id
897 .write()
898 .insert(table_id, schema_id);
899 }
900
901 fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
902 let schema_id = self
903 .table_id_to_schema_id
904 .write()
905 .remove(&table_id)
906 .unwrap();
907 (self.get_database_id_by_schema(schema_id), schema_id)
908 }
909
910 fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
911 let schema_id = self
912 .table_id_to_schema_id
913 .write()
914 .remove(&table_id)
915 .unwrap();
916 (self.get_database_id_by_schema(schema_id), schema_id)
917 }
918
919 fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
920 let schema_id = self
921 .table_id_to_schema_id
922 .write()
923 .remove(&table_id)
924 .unwrap();
925 (self.get_database_id_by_schema(schema_id), schema_id)
926 }
927
928 fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
929 self.schema_id_to_database_id
930 .write()
931 .insert(schema_id, database_id);
932 }
933
934 fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
935 self.schema_id_to_database_id
936 .write()
937 .remove(&schema_id)
938 .unwrap()
939 }
940
941 fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
942 source.id = self.gen_id();
943 self.catalog.write().create_source(&source);
944 self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
945 Ok(source.id)
946 }
947
948 fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<SinkId> {
949 sink.id = self.gen_id();
950 sink.stream_job_status = PbStreamJobStatus::Created as _;
951 self.catalog.write().create_sink(&sink);
952 self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
953 Ok(sink.id)
954 }
955
956 fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
957 subscription.id = self.gen_id();
958 self.catalog.write().create_subscription(&subscription);
959 self.add_table_or_subscription_id(
960 subscription.id.as_raw_id(),
961 subscription.schema_id,
962 subscription.database_id,
963 );
964 Ok(())
965 }
966
967 fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
968 *self
969 .schema_id_to_database_id
970 .read()
971 .get(&schema_id)
972 .unwrap()
973 }
974
975 fn get_object_type(&self, object_id: ObjectId) -> PbObjectType {
976 let catalog = self.catalog.read();
977 for database in catalog.iter_databases() {
978 for schema in database.iter_schemas() {
979 if let Some(table) = schema.get_created_table_by_id(object_id.as_table_id()) {
980 return if table.is_mview() {
981 PbObjectType::Mview
982 } else {
983 PbObjectType::Table
984 };
985 }
986 if schema.get_source_by_id(object_id.as_source_id()).is_some() {
987 return PbObjectType::Source;
988 }
989 if schema.get_view_by_id(object_id.as_view_id()).is_some() {
990 return PbObjectType::View;
991 }
992 if schema.get_index_by_id(object_id.as_index_id()).is_some() {
993 return PbObjectType::Index;
994 }
995 }
996 }
997 PbObjectType::Unspecified
998 }
999
1000 fn insert_object_dependencies(&self, object_id: ObjectId, dependencies: HashSet<ObjectId>) {
1001 if dependencies.is_empty() {
1002 return;
1003 }
1004 let dependencies = dependencies
1005 .into_iter()
1006 .map(|referenced_object_id| PbObjectDependency {
1007 object_id,
1008 referenced_object_id,
1009 referenced_object_type: self.get_object_type(referenced_object_id) as i32,
1010 })
1011 .collect();
1012 self.catalog
1013 .write()
1014 .insert_object_dependencies(dependencies);
1015 }
1016}
1017
1018pub struct MockUserInfoWriter {
1019 id: AtomicU32,
1020 user_info: Arc<RwLock<UserInfoManager>>,
1021}
1022
1023#[async_trait::async_trait]
1024impl UserInfoWriter for MockUserInfoWriter {
1025 async fn create_user(&self, user: UserInfo) -> Result<()> {
1026 let mut user = user;
1027 user.id = self.gen_id().into();
1028 self.user_info.write().create_user(user);
1029 Ok(())
1030 }
1031
1032 async fn drop_user(&self, id: UserId) -> Result<()> {
1033 self.user_info.write().drop_user(id);
1034 Ok(())
1035 }
1036
1037 async fn update_user(
1038 &self,
1039 update_user: UserInfo,
1040 update_fields: Vec<UpdateField>,
1041 ) -> Result<()> {
1042 let mut lock = self.user_info.write();
1043 let id = update_user.get_id();
1044 let Some(old_name) = lock.get_user_name_by_id(id) else {
1045 return Ok(());
1046 };
1047 let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
1048 update_fields.into_iter().for_each(|field| match field {
1049 UpdateField::Super => user_info.is_super = update_user.is_super,
1050 UpdateField::Login => user_info.can_login = update_user.can_login,
1051 UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
1052 UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
1053 UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
1054 UpdateField::Rename => user_info.name.clone_from(&update_user.name),
1055 UpdateField::Admin => user_info.is_admin = update_user.is_admin,
1056 UpdateField::Unspecified => unreachable!(),
1057 });
1058 lock.update_user(update_user);
1059 Ok(())
1060 }
1061
1062 async fn grant_privilege(
1065 &self,
1066 users: Vec<UserId>,
1067 privileges: Vec<GrantPrivilege>,
1068 with_grant_option: bool,
1069 _grantor: UserId,
1070 ) -> Result<()> {
1071 let privileges = privileges
1072 .into_iter()
1073 .map(|mut p| {
1074 p.action_with_opts
1075 .iter_mut()
1076 .for_each(|ao| ao.with_grant_option = with_grant_option);
1077 p
1078 })
1079 .collect::<Vec<_>>();
1080 for user_id in users {
1081 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1082 u.extend_privileges(privileges.clone());
1083 }
1084 }
1085 Ok(())
1086 }
1087
1088 async fn revoke_privilege(
1091 &self,
1092 users: Vec<UserId>,
1093 privileges: Vec<GrantPrivilege>,
1094 _granted_by: UserId,
1095 _revoke_by: UserId,
1096 revoke_grant_option: bool,
1097 _cascade: bool,
1098 ) -> Result<()> {
1099 for user_id in users {
1100 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1101 u.revoke_privileges(privileges.clone(), revoke_grant_option);
1102 }
1103 }
1104 Ok(())
1105 }
1106
1107 async fn alter_default_privilege(
1108 &self,
1109 _users: Vec<UserId>,
1110 _database_id: DatabaseId,
1111 _schemas: Vec<SchemaId>,
1112 _operation: AlterDefaultPrivilegeOperation,
1113 _operated_by: UserId,
1114 ) -> Result<()> {
1115 todo!()
1116 }
1117}
1118
1119impl MockUserInfoWriter {
1120 pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1121 user_info.write().create_user(UserInfo {
1122 id: DEFAULT_SUPER_USER_ID,
1123 name: DEFAULT_SUPER_USER.to_owned(),
1124 is_super: true,
1125 can_create_db: true,
1126 can_create_user: true,
1127 can_login: true,
1128 ..Default::default()
1129 });
1130 user_info.write().create_user(UserInfo {
1131 id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1132 name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1133 is_super: true,
1134 can_create_db: true,
1135 can_create_user: true,
1136 can_login: true,
1137 is_admin: true,
1138 ..Default::default()
1139 });
1140 Self {
1141 user_info,
1142 id: AtomicU32::new(NON_RESERVED_USER_ID.as_raw_id()),
1143 }
1144 }
1145
1146 fn gen_id(&self) -> u32 {
1147 self.id.fetch_add(1, Ordering::SeqCst)
1148 }
1149}
1150
1151pub struct MockFrontendMetaClient {}
1152
1153#[async_trait::async_trait]
1154impl FrontendMetaClient for MockFrontendMetaClient {
1155 async fn try_unregister(&self) {}
1156
1157 async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1158 Ok(INVALID_VERSION_ID)
1159 }
1160
1161 async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1162 Ok(vec![])
1163 }
1164
1165 async fn list_table_fragments(
1166 &self,
1167 _table_ids: &[JobId],
1168 ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1169 Ok(HashMap::default())
1170 }
1171
1172 async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1173 Ok(vec![])
1174 }
1175
1176 async fn list_fragment_distribution(
1177 &self,
1178 _include_node: bool,
1179 ) -> RpcResult<Vec<FragmentDistribution>> {
1180 Ok(vec![])
1181 }
1182
1183 async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1184 Ok(vec![])
1185 }
1186
1187 async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1188 Ok(vec![])
1189 }
1190
1191 async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1192 Ok(vec![])
1193 }
1194
1195 async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1196 Ok(vec![])
1197 }
1198
1199 async fn list_sink_log_store_tables(
1200 &self,
1201 ) -> RpcResult<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1202 Ok(vec![])
1203 }
1204
1205 async fn set_system_param(
1206 &self,
1207 _param: String,
1208 _value: Option<String>,
1209 ) -> RpcResult<Option<SystemParamsReader>> {
1210 Ok(Some(SystemParams::default().into()))
1211 }
1212
1213 async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1214 Ok(Default::default())
1215 }
1216
1217 async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1218 Ok("".to_owned())
1219 }
1220
1221 async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1222 Ok(vec![])
1223 }
1224
1225 async fn get_tables(
1226 &self,
1227 _table_ids: Vec<crate::catalog::TableId>,
1228 _include_dropped_tables: bool,
1229 ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1230 Ok(HashMap::new())
1231 }
1232
1233 async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, HummockVersionId)>> {
1234 unimplemented!()
1235 }
1236
1237 async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1238 unimplemented!()
1239 }
1240
1241 async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1242 Ok(HummockVersion::default())
1243 }
1244
1245 async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1246 unimplemented!()
1247 }
1248
1249 async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1250 unimplemented!()
1251 }
1252
1253 async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1254 unimplemented!()
1255 }
1256
1257 async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1258 unimplemented!()
1259 }
1260
1261 async fn list_hummock_active_write_limits(
1262 &self,
1263 ) -> RpcResult<HashMap<CompactionGroupId, WriteLimit>> {
1264 unimplemented!()
1265 }
1266
1267 async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1268 unimplemented!()
1269 }
1270
1271 async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1272 Ok(vec![])
1273 }
1274
1275 async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1276 unimplemented!()
1277 }
1278
1279 async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1280 Ok(vec![])
1281 }
1282
1283 async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1284 unimplemented!()
1285 }
1286
1287 async fn recover(&self) -> RpcResult<()> {
1288 unimplemented!()
1289 }
1290
1291 async fn backup_meta(&self, _remarks: Option<String>) -> RpcResult<u64> {
1292 unimplemented!()
1293 }
1294
1295 async fn get_backup_job_status(
1296 &self,
1297 _job_id: u64,
1298 ) -> RpcResult<(risingwave_pb::backup_service::BackupJobStatus, String)> {
1299 unimplemented!()
1300 }
1301
1302 async fn delete_meta_snapshot(&self, _snapshot_ids: &[u64]) -> RpcResult<()> {
1303 unimplemented!()
1304 }
1305
1306 async fn apply_throttle(
1307 &self,
1308 _throttle_target: PbThrottleTarget,
1309 _throttle_type: risingwave_pb::common::PbThrottleType,
1310 _id: u32,
1311 _rate_limit: Option<u32>,
1312 ) -> RpcResult<()> {
1313 unimplemented!()
1314 }
1315
1316 async fn alter_fragment_parallelism(
1317 &self,
1318 _fragment_ids: Vec<FragmentId>,
1319 _parallelism: Option<PbTableParallelism>,
1320 ) -> RpcResult<()> {
1321 unimplemented!()
1322 }
1323
1324 async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1325 Ok(RecoveryStatus::StatusRunning)
1326 }
1327
1328 async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1329 Ok(vec![])
1330 }
1331
1332 async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1333 Ok(vec![])
1334 }
1335
1336 async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1337 Ok(HashMap::default())
1338 }
1339
1340 async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1341 unimplemented!()
1342 }
1343
1344 async fn alter_sink_props(
1345 &self,
1346 _sink_id: SinkId,
1347 _changed_props: BTreeMap<String, String>,
1348 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1349 _connector_conn_ref: Option<ConnectionId>,
1350 ) -> RpcResult<()> {
1351 unimplemented!()
1352 }
1353
1354 async fn alter_iceberg_table_props(
1355 &self,
1356 _table_id: TableId,
1357 _sink_id: SinkId,
1358 _source_id: SourceId,
1359 _changed_props: BTreeMap<String, String>,
1360 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1361 _connector_conn_ref: Option<ConnectionId>,
1362 ) -> RpcResult<()> {
1363 unimplemented!()
1364 }
1365
1366 async fn alter_source_connector_props(
1367 &self,
1368 _source_id: SourceId,
1369 _changed_props: BTreeMap<String, String>,
1370 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1371 _connector_conn_ref: Option<ConnectionId>,
1372 ) -> RpcResult<()> {
1373 unimplemented!()
1374 }
1375
1376 async fn alter_connection_connector_props(
1377 &self,
1378 _connection_id: u32,
1379 _changed_props: BTreeMap<String, String>,
1380 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1381 ) -> RpcResult<()> {
1382 Ok(())
1383 }
1384
1385 async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1386 unimplemented!()
1387 }
1388
1389 async fn list_iceberg_compaction_status(&self) -> RpcResult<Vec<IcebergCompactionStatus>> {
1390 Ok(vec![])
1391 }
1392
1393 async fn get_fragment_by_id(
1394 &self,
1395 _fragment_id: FragmentId,
1396 ) -> RpcResult<Option<FragmentDistribution>> {
1397 unimplemented!()
1398 }
1399
1400 async fn get_fragment_vnodes(
1401 &self,
1402 _fragment_id: FragmentId,
1403 ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1404 unimplemented!()
1405 }
1406
1407 async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1408 unimplemented!()
1409 }
1410
1411 fn worker_id(&self) -> WorkerId {
1412 0.into()
1413 }
1414
1415 async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1416 Ok(())
1417 }
1418
1419 async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1420 Ok(1)
1421 }
1422
1423 async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1424 Ok(())
1425 }
1426
1427 async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1428 Ok(RefreshResponse { status: None })
1429 }
1430
1431 fn cluster_id(&self) -> &str {
1432 "test-cluster-uuid"
1433 }
1434
1435 async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1436 unimplemented!()
1437 }
1438
1439 async fn get_hummock_table_change_log(
1440 &self,
1441 _start_epoch_inclusive: Option<u64>,
1442 _end_epoch_inclusive: Option<u64>,
1443 _table_ids: Option<HashSet<TableId>>,
1444 _exclude_empty: bool,
1445 _limit: Option<u32>,
1446 ) -> RpcResult<TableChangeLogs> {
1447 Ok(HashMap::default())
1448 }
1449}
1450
1451#[cfg(test)]
1452pub static PROTO_FILE_DATA: &str = r#"
1453 syntax = "proto3";
1454 package test;
1455 message TestRecord {
1456 int32 id = 1;
1457 Country country = 3;
1458 int64 zipcode = 4;
1459 float rate = 5;
1460 }
1461 message TestRecordAlterType {
1462 string id = 1;
1463 Country country = 3;
1464 int32 zipcode = 4;
1465 float rate = 5;
1466 }
1467 message TestRecordExt {
1468 int32 id = 1;
1469 Country country = 3;
1470 int64 zipcode = 4;
1471 float rate = 5;
1472 string name = 6;
1473 }
1474 message Country {
1475 string address = 1;
1476 City city = 2;
1477 string zipcode = 3;
1478 }
1479 message City {
1480 string address = 1;
1481 string zipcode = 2;
1482 }"#;
1483
1484pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1487 let in_file = Builder::new()
1488 .prefix("temp")
1489 .suffix(".proto")
1490 .rand_bytes(8)
1491 .tempfile()
1492 .unwrap();
1493
1494 let out_file = Builder::new()
1495 .prefix("temp")
1496 .suffix(".pb")
1497 .rand_bytes(8)
1498 .tempfile()
1499 .unwrap();
1500
1501 let mut file = in_file.as_file();
1502 file.write_all(proto_data.as_ref())
1503 .expect("writing binary to test file");
1504 file.flush().expect("flush temp file failed");
1505 let include_path = in_file
1506 .path()
1507 .parent()
1508 .unwrap()
1509 .to_string_lossy()
1510 .into_owned();
1511 let out_path = out_file.path().to_string_lossy().into_owned();
1512 let in_path = in_file.path().to_string_lossy().into_owned();
1513 let mut compile = std::process::Command::new("protoc");
1514
1515 let out = compile
1516 .arg("--include_imports")
1517 .arg("-I")
1518 .arg(include_path)
1519 .arg(format!("--descriptor_set_out={}", out_path))
1520 .arg(in_path)
1521 .output()
1522 .expect("failed to compile proto");
1523 if !out.status.success() {
1524 panic!("compile proto failed \n output: {:?}", out);
1525 }
1526 out_file
1527}