1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28 AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29 DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30 FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31 RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::config::FrontendConfig;
34use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
35use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
36use risingwave_common::session_config::SessionConfig;
37use risingwave_common::system_param::AdaptiveParallelismStrategy;
38use risingwave_common::system_param::reader::SystemParamsReader;
39use risingwave_common::util::cluster_limit::ClusterLimit;
40use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
41use risingwave_hummock_sdk::change_log::TableChangeLogs;
42use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
43use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
44use risingwave_pb::backup_service::MetaSnapshotMetadata;
45use risingwave_pb::catalog::{
46 PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
47 PbSubscription, PbTable, PbView, Table,
48};
49use risingwave_pb::common::{PbObjectType, WorkerNode};
50use risingwave_pb::ddl_service::alter_owner_request::Object;
51use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
52use risingwave_pb::ddl_service::{
53 DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
54 alter_swap_rename_request, create_connection_request, streaming_job_resource_type,
55};
56use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig as PbMutableConfig;
57use risingwave_pb::hummock::write_limits::WriteLimit;
58use risingwave_pb::hummock::{
59 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
60};
61use risingwave_pb::id::ActorId;
62use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
63use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
64use risingwave_pb::meta::list_actor_states_response::ActorState;
65use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
66use risingwave_pb::meta::list_iceberg_compaction_status_response::IcebergCompactionStatus;
67use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
68use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
69use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
70use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
71use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
72use risingwave_pb::meta::{
73 EventLog, FragmentDistribution, ObjectDependency as PbObjectDependency, PbTableParallelism,
74 PbThrottleTarget, RecoveryStatus, RefreshRequest, RefreshResponse, SystemParams,
75 list_sink_log_store_tables_response,
76};
77use risingwave_pb::secret::PbSecretRef;
78use risingwave_pb::stream_plan::StreamFragmentGraph;
79use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
80use risingwave_pb::user::update_user_request::UpdateField;
81use risingwave_pb::user::{GrantPrivilege, UserInfo};
82use risingwave_rpc_client::error::Result as RpcResult;
83use tempfile::{Builder, NamedTempFile};
84
85use crate::FrontendOpts;
86use crate::catalog::catalog_service::CatalogWriter;
87use crate::catalog::root_catalog::Catalog;
88use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
89use crate::error::{ErrorCode, Result, RwError};
90use crate::handler::RwPgResponse;
91use crate::meta_client::FrontendMetaClient;
92use crate::scheduler::HummockSnapshotManagerRef;
93use crate::session::{AuthContext, FrontendEnv, SessionImpl};
94use crate::user::UserId;
95use crate::user::user_manager::UserInfoManager;
96use crate::user::user_service::UserInfoWriter;
97
98pub struct LocalFrontend {
100 pub opts: FrontendOpts,
101 env: FrontendEnv,
102}
103
104impl SessionManager for LocalFrontend {
105 type Error = RwError;
106 type Session = SessionImpl;
107
108 fn create_dummy_session(
109 &self,
110 _database_id: DatabaseId,
111 ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
112 unreachable!()
113 }
114
115 fn connect(
116 &self,
117 _database: &str,
118 _user_name: &str,
119 _peer_addr: AddressRef,
120 ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
121 Ok(self.session_ref())
122 }
123
124 fn cancel_queries_in_session(&self, _session_id: SessionId) {
125 unreachable!()
126 }
127
128 fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
129 unreachable!()
130 }
131
132 fn end_session(&self, _session: &Self::Session) {
133 unreachable!()
134 }
135}
136
137impl LocalFrontend {
138 #[expect(clippy::unused_async)]
139 pub async fn new(opts: FrontendOpts) -> Self {
140 let env = FrontendEnv::mock();
141 Self { opts, env }
142 }
143
144 #[expect(clippy::unused_async)]
145 pub async fn with_frontend_config(opts: FrontendOpts, frontend_config: FrontendConfig) -> Self {
146 let mut env = FrontendEnv::mock();
147 env.set_frontend_config_for_test(frontend_config);
148 Self { opts, env }
149 }
150
151 pub async fn run_sql(
152 &self,
153 sql: impl Into<String>,
154 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
155 let sql: Arc<str> = Arc::from(sql.into());
156 Box::pin(self.session_ref().run_statement(sql, vec![])).await
157 }
158
159 pub async fn run_sql_with_session(
160 &self,
161 session_ref: Arc<SessionImpl>,
162 sql: impl Into<String>,
163 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
164 let sql: Arc<str> = Arc::from(sql.into());
165 Box::pin(session_ref.run_statement(sql, vec![])).await
166 }
167
168 pub async fn run_user_sql(
169 &self,
170 sql: impl Into<String>,
171 database: String,
172 user_name: String,
173 user_id: UserId,
174 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
175 let sql: Arc<str> = Arc::from(sql.into());
176 Box::pin(
177 self.session_user_ref(database, user_name, user_id)
178 .run_statement(sql, vec![]),
179 )
180 .await
181 }
182
183 pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
184 let mut rsp = self.run_sql(sql).await.unwrap();
185 let mut res = vec![];
186 #[for_await]
187 for row_set in rsp.values_stream() {
188 for row in row_set.unwrap() {
189 res.push(format!("{:?}", row));
190 }
191 }
192 res
193 }
194
195 pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
196 let mut rsp = self.run_sql(sql).await.unwrap();
197 assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
198 let mut res = String::new();
199 #[for_await]
200 for row_set in rsp.values_stream() {
201 for row in row_set.unwrap() {
202 let row: Row = row;
203 let row = row.values()[0].as_ref().unwrap();
204 res += std::str::from_utf8(row).unwrap();
205 res += "\n";
206 }
207 }
208 res
209 }
210
211 pub fn session_ref(&self) -> Arc<SessionImpl> {
213 self.session_user_ref(
214 DEFAULT_DATABASE_NAME.to_owned(),
215 DEFAULT_SUPER_USER.to_owned(),
216 DEFAULT_SUPER_USER_ID,
217 )
218 }
219
220 pub fn session_user_ref(
221 &self,
222 database: String,
223 user_name: String,
224 user_id: UserId,
225 ) -> Arc<SessionImpl> {
226 Arc::new(SessionImpl::new(
227 self.env.clone(),
228 AuthContext::new(database, user_name, user_id),
229 UserAuthenticator::None,
230 (0, 0),
232 Address::Tcp(SocketAddr::new(
233 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
234 6666,
235 ))
236 .into(),
237 Default::default(),
238 ))
239 }
240}
241
242pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
243 if rsp.stmt_type() != StatementType::EXPLAIN {
244 panic!("RESPONSE INVALID: {rsp:?}");
245 }
246 let mut res = String::new();
247 #[for_await]
248 for row_set in rsp.values_stream() {
249 for row in row_set.unwrap() {
250 let row: Row = row;
251 let row = row.values()[0].as_ref().unwrap();
252 res += std::str::from_utf8(row).unwrap();
253 res += "\n";
254 }
255 }
256 res
257}
258
259pub struct MockCatalogWriter {
260 catalog: Arc<RwLock<Catalog>>,
261 id: AtomicU32,
262 table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
263 schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
264 hummock_snapshot_manager: HummockSnapshotManagerRef,
265}
266
267#[async_trait::async_trait]
268impl CatalogWriter for MockCatalogWriter {
269 async fn create_database(
270 &self,
271 db_name: &str,
272 owner: UserId,
273 resource_group: &str,
274 barrier_interval_ms: Option<u32>,
275 checkpoint_frequency: Option<u64>,
276 ) -> Result<()> {
277 let database_id = DatabaseId::new(self.gen_id());
278 self.catalog.write().create_database(&PbDatabase {
279 name: db_name.to_owned(),
280 id: database_id,
281 owner,
282 resource_group: resource_group.to_owned(),
283 barrier_interval_ms,
284 checkpoint_frequency,
285 });
286 self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
287 .await?;
288 self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
289 .await?;
290 self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
291 .await?;
292 Ok(())
293 }
294
295 async fn create_schema(
296 &self,
297 db_id: DatabaseId,
298 schema_name: &str,
299 owner: UserId,
300 ) -> Result<()> {
301 let id = self.gen_id();
302 self.catalog.write().create_schema(&PbSchema {
303 id,
304 name: schema_name.to_owned(),
305 database_id: db_id,
306 owner,
307 });
308 self.add_schema_id(id, db_id);
309 Ok(())
310 }
311
312 async fn create_materialized_view(
313 &self,
314 mut table: PbTable,
315 _graph: StreamFragmentGraph,
316 dependencies: HashSet<ObjectId>,
317 _resource_type: streaming_job_resource_type::ResourceType,
318 _if_not_exists: bool,
319 _refresh_interval_sec: Option<u64>,
320 ) -> Result<()> {
321 table.id = self.gen_id();
322 table.stream_job_status = PbStreamJobStatus::Created as _;
323 table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
324 self.catalog.write().create_table(&table);
325 self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
326 self.insert_object_dependencies(table.id.as_object_id(), dependencies);
327 self.hummock_snapshot_manager.add_table_for_test(table.id);
328 Ok(())
329 }
330
331 async fn replace_materialized_view(
332 &self,
333 mut table: PbTable,
334 _graph: StreamFragmentGraph,
335 ) -> Result<()> {
336 table.stream_job_status = PbStreamJobStatus::Created as _;
337 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
338 self.catalog.write().update_table(&table);
339 Ok(())
340 }
341
342 async fn create_view(&self, mut view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
343 view.id = self.gen_id();
344 self.catalog.write().create_view(&view);
345 self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
346 self.insert_object_dependencies(view.id.as_object_id(), dependencies);
347 Ok(())
348 }
349
350 async fn create_table(
351 &self,
352 source: Option<PbSource>,
353 mut table: PbTable,
354 graph: StreamFragmentGraph,
355 _job_type: PbTableJobType,
356 if_not_exists: bool,
357 dependencies: HashSet<ObjectId>,
358 ) -> Result<()> {
359 if let Some(source) = source {
360 let source_id = self.create_source_inner(source)?;
361 table.optional_associated_source_id = Some(source_id.into());
362 }
363 self.create_materialized_view(
364 table,
365 graph,
366 dependencies,
367 streaming_job_resource_type::ResourceType::Regular(true),
368 if_not_exists,
369 None,
370 )
371 .await?;
372 Ok(())
373 }
374
375 async fn replace_table(
376 &self,
377 _source: Option<PbSource>,
378 mut table: PbTable,
379 _graph: StreamFragmentGraph,
380 _job_type: TableJobType,
381 ) -> Result<()> {
382 table.stream_job_status = PbStreamJobStatus::Created as _;
383 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
384 self.catalog.write().update_table(&table);
385 Ok(())
386 }
387
388 async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
389 self.catalog.write().update_source(&source);
390 Ok(())
391 }
392
393 async fn create_source(
394 &self,
395 source: PbSource,
396 _graph: Option<StreamFragmentGraph>,
397 _if_not_exists: bool,
398 ) -> Result<()> {
399 self.create_source_inner(source).map(|_| ())
400 }
401
402 async fn create_sink(
403 &self,
404 sink: PbSink,
405 graph: StreamFragmentGraph,
406 dependencies: HashSet<ObjectId>,
407 _resource_type: streaming_job_resource_type::ResourceType,
408 _if_not_exists: bool,
409 ) -> Result<()> {
410 let sink_id = self.create_sink_inner(sink, graph)?;
411 self.insert_object_dependencies(sink_id.as_object_id(), dependencies);
412 Ok(())
413 }
414
415 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
416 self.create_subscription_inner(subscription)
417 }
418
419 async fn create_index(
420 &self,
421 mut index: PbIndex,
422 mut index_table: PbTable,
423 _graph: StreamFragmentGraph,
424 _resource_type: streaming_job_resource_type::ResourceType,
425 _if_not_exists: bool,
426 ) -> Result<()> {
427 index_table.id = self.gen_id();
428 index_table.stream_job_status = PbStreamJobStatus::Created as _;
429 index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
430 self.catalog.write().create_table(&index_table);
431 self.add_table_or_index_id(
432 index_table.id.as_raw_id(),
433 index_table.schema_id,
434 index_table.database_id,
435 );
436
437 index.id = index_table.id.as_raw_id().into();
438 index.index_table_id = index_table.id;
439 self.catalog.write().create_index(&index);
440 Ok(())
441 }
442
443 async fn create_function(&self, _function: PbFunction) -> Result<()> {
444 unreachable!()
445 }
446
447 async fn create_connection(
448 &self,
449 _connection_name: String,
450 _database_id: DatabaseId,
451 _schema_id: SchemaId,
452 _owner_id: UserId,
453 _connection: create_connection_request::Payload,
454 ) -> Result<()> {
455 unreachable!()
456 }
457
458 async fn create_secret(
459 &self,
460 _secret_name: String,
461 _database_id: DatabaseId,
462 _schema_id: SchemaId,
463 _owner_id: UserId,
464 _payload: Vec<u8>,
465 ) -> Result<()> {
466 unreachable!()
467 }
468
469 async fn comment_on(&self, _comment: PbComment) -> Result<()> {
470 unreachable!()
471 }
472
473 async fn drop_table(
474 &self,
475 source_id: Option<SourceId>,
476 table_id: TableId,
477 cascade: bool,
478 ) -> Result<()> {
479 if cascade {
480 return Err(ErrorCode::NotSupported(
481 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
482 "use drop instead".to_owned(),
483 )
484 .into());
485 }
486 if let Some(source_id) = source_id {
487 self.drop_table_or_source_id(source_id.as_raw_id());
488 }
489 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
490 let indexes =
491 self.catalog
492 .read()
493 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
494 for index in indexes {
495 self.drop_index(index.id, cascade).await?;
496 }
497 self.catalog
498 .write()
499 .drop_table(database_id, schema_id, table_id);
500 if let Some(source_id) = source_id {
501 self.catalog
502 .write()
503 .drop_source(database_id, schema_id, source_id);
504 }
505 Ok(())
506 }
507
508 async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
509 unreachable!()
510 }
511
512 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
513 if cascade {
514 return Err(ErrorCode::NotSupported(
515 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
516 "use drop instead".to_owned(),
517 )
518 .into());
519 }
520 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
521 let indexes =
522 self.catalog
523 .read()
524 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
525 for index in indexes {
526 self.drop_index(index.id, cascade).await?;
527 }
528 self.catalog
529 .write()
530 .drop_table(database_id, schema_id, table_id);
531 Ok(())
532 }
533
534 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
535 if cascade {
536 return Err(ErrorCode::NotSupported(
537 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
538 "use drop instead".to_owned(),
539 )
540 .into());
541 }
542 let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
543 self.catalog
544 .write()
545 .drop_source(database_id, schema_id, source_id);
546 Ok(())
547 }
548
549 async fn reset_source(&self, _source_id: SourceId) -> Result<()> {
550 Ok(())
551 }
552
553 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
554 if cascade {
555 return Err(ErrorCode::NotSupported(
556 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
557 "use drop instead".to_owned(),
558 )
559 .into());
560 }
561 let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
562 self.catalog
563 .write()
564 .drop_sink(database_id, schema_id, sink_id);
565 Ok(())
566 }
567
568 async fn drop_subscription(
569 &self,
570 subscription_id: SubscriptionId,
571 cascade: bool,
572 ) -> Result<()> {
573 if cascade {
574 return Err(ErrorCode::NotSupported(
575 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
576 "use drop instead".to_owned(),
577 )
578 .into());
579 }
580 let (database_id, schema_id) =
581 self.drop_table_or_subscription_id(subscription_id.as_raw_id());
582 self.catalog
583 .write()
584 .drop_subscription(database_id, schema_id, subscription_id);
585 Ok(())
586 }
587
588 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
589 if cascade {
590 return Err(ErrorCode::NotSupported(
591 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
592 "use drop instead".to_owned(),
593 )
594 .into());
595 }
596 let &schema_id = self
597 .table_id_to_schema_id
598 .read()
599 .get(&index_id.as_raw_id())
600 .unwrap();
601 let database_id = self.get_database_id_by_schema(schema_id);
602
603 let index = {
604 let catalog_reader = self.catalog.read();
605 let schema_catalog = catalog_reader
606 .get_schema_by_id(database_id, schema_id)
607 .unwrap();
608 schema_catalog.get_index_by_id(index_id).unwrap().clone()
609 };
610
611 let index_table_id = index.index_table().id;
612 let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
613 self.catalog
614 .write()
615 .drop_index(database_id, schema_id, index_id);
616 self.catalog
617 .write()
618 .drop_table(database_id, schema_id, index_table_id);
619 Ok(())
620 }
621
622 async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
623 unreachable!()
624 }
625
626 async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
627 unreachable!()
628 }
629
630 async fn drop_secret(&self, _secret_id: SecretId, _cascade: bool) -> Result<()> {
631 unreachable!()
632 }
633
634 async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
635 self.catalog.write().drop_database(database_id);
636 Ok(())
637 }
638
639 async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
640 let database_id = self.drop_schema_id(schema_id);
641 self.catalog.write().drop_schema(database_id, schema_id);
642 Ok(())
643 }
644
645 async fn alter_name(
646 &self,
647 object_id: alter_name_request::Object,
648 object_name: &str,
649 ) -> Result<()> {
650 match object_id {
651 alter_name_request::Object::TableId(table_id) => {
652 self.catalog
653 .write()
654 .alter_table_name_by_id(table_id, object_name);
655 Ok(())
656 }
657 _ => {
658 unimplemented!()
659 }
660 }
661 }
662
663 async fn alter_source(&self, source: PbSource) -> Result<()> {
664 self.catalog.write().update_source(&source);
665 Ok(())
666 }
667
668 async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<()> {
669 for database in self.catalog.read().iter_databases() {
670 for schema in database.iter_schemas() {
671 match object {
672 Object::TableId(table_id) => {
673 if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
674 {
675 let mut pb_table = table.to_prost();
676 pb_table.owner = owner_id;
677 self.catalog.write().update_table(&pb_table);
678 return Ok(());
679 }
680 }
681 _ => unreachable!(),
682 }
683 }
684 }
685
686 Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
687 }
688
689 async fn alter_subscription_retention(
690 &self,
691 subscription_id: SubscriptionId,
692 retention_seconds: u64,
693 definition: String,
694 ) -> Result<()> {
695 for database in self.catalog.read().iter_databases() {
696 for schema in database.iter_schemas() {
697 if let Some(subscription) = schema.get_subscription_by_id(subscription_id) {
698 let mut pb_subscription = subscription.to_proto();
699 pb_subscription.retention_seconds = retention_seconds;
700 pb_subscription.definition = definition;
701 self.catalog.write().update_subscription(&pb_subscription);
702 return Ok(());
703 }
704 }
705 }
706
707 Err(
708 ErrorCode::ItemNotFound(format!("subscription not found: {:?}", subscription_id))
709 .into(),
710 )
711 }
712
713 async fn alter_set_schema(
714 &self,
715 object: alter_set_schema_request::Object,
716 new_schema_id: SchemaId,
717 ) -> Result<()> {
718 match object {
719 alter_set_schema_request::Object::TableId(table_id) => {
720 let mut pb_table = {
721 let reader = self.catalog.read();
722 let table = reader.get_any_table_by_id(table_id)?.to_owned();
723 table.to_prost()
724 };
725 pb_table.schema_id = new_schema_id;
726 self.catalog.write().update_table(&pb_table);
727 self.table_id_to_schema_id
728 .write()
729 .insert(table_id.as_raw_id(), new_schema_id);
730 Ok(())
731 }
732 _ => unreachable!(),
733 }
734 }
735
736 async fn alter_parallelism(
737 &self,
738 _job_id: JobId,
739 _parallelism: PbTableParallelism,
740 _adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
741 _deferred: bool,
742 ) -> Result<()> {
743 todo!()
744 }
745
746 async fn alter_backfill_parallelism(
747 &self,
748 _job_id: JobId,
749 _parallelism: Option<PbTableParallelism>,
750 _adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
751 _deferred: bool,
752 ) -> Result<()> {
753 todo!()
754 }
755
756 async fn alter_config(
757 &self,
758 _job_id: JobId,
759 _entries_to_add: HashMap<String, String>,
760 _keys_to_remove: Vec<String>,
761 ) -> Result<()> {
762 todo!()
763 }
764
765 async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
766 todo!()
767 }
768
769 async fn alter_secret(
770 &self,
771 _secret_id: SecretId,
772 _secret_name: String,
773 _database_id: DatabaseId,
774 _schema_id: SchemaId,
775 _owner_id: UserId,
776 _payload: Vec<u8>,
777 ) -> Result<()> {
778 unreachable!()
779 }
780
781 async fn alter_resource_group(
782 &self,
783 _job_id: JobId,
784 _resource_group: Option<String>,
785 _deferred: bool,
786 ) -> Result<()> {
787 todo!()
788 }
789
790 async fn alter_database_resource_group(
791 &self,
792 database_id: DatabaseId,
793 resource_group: Option<String>,
794 _deferred: bool,
795 ) -> Result<()> {
796 let mut pb_database = {
797 let reader = self.catalog.read();
798 let database = reader.get_database_by_id(database_id)?.to_owned();
799 database.to_prost()
800 };
801 pb_database.resource_group =
802 resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned());
803 self.catalog.write().update_database(&pb_database);
804 Ok(())
805 }
806
807 async fn alter_database_param(
808 &self,
809 database_id: DatabaseId,
810 param: AlterDatabaseParam,
811 ) -> Result<()> {
812 let mut pb_database = {
813 let reader = self.catalog.read();
814 let database = reader.get_database_by_id(database_id)?.to_owned();
815 database.to_prost()
816 };
817 match param {
818 AlterDatabaseParam::BarrierIntervalMs(interval) => {
819 pb_database.barrier_interval_ms = interval;
820 }
821 AlterDatabaseParam::CheckpointFrequency(frequency) => {
822 pb_database.checkpoint_frequency = frequency;
823 }
824 }
825 self.catalog.write().update_database(&pb_database);
826 Ok(())
827 }
828
829 async fn create_iceberg_table(
830 &self,
831 _table_job_info: PbTableJobInfo,
832 _sink_job_info: PbSinkJobInfo,
833 _iceberg_source: PbSource,
834 _if_not_exists: bool,
835 ) -> Result<()> {
836 todo!()
837 }
838
839 async fn wait(&self, _job_id: Option<JobId>) -> Result<()> {
840 Ok(())
841 }
842}
843
844impl MockCatalogWriter {
845 pub fn new(
846 catalog: Arc<RwLock<Catalog>>,
847 hummock_snapshot_manager: HummockSnapshotManagerRef,
848 ) -> Self {
849 catalog.write().create_database(&PbDatabase {
850 id: 0.into(),
851 name: DEFAULT_DATABASE_NAME.to_owned(),
852 owner: DEFAULT_SUPER_USER_ID,
853 resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
854 barrier_interval_ms: None,
855 checkpoint_frequency: None,
856 });
857 catalog.write().create_schema(&PbSchema {
858 id: 1.into(),
859 name: DEFAULT_SCHEMA_NAME.to_owned(),
860 database_id: 0.into(),
861 owner: DEFAULT_SUPER_USER_ID,
862 });
863 catalog.write().create_schema(&PbSchema {
864 id: 2.into(),
865 name: PG_CATALOG_SCHEMA_NAME.to_owned(),
866 database_id: 0.into(),
867 owner: DEFAULT_SUPER_USER_ID,
868 });
869 catalog.write().create_schema(&PbSchema {
870 id: 3.into(),
871 name: RW_CATALOG_SCHEMA_NAME.to_owned(),
872 database_id: 0.into(),
873 owner: DEFAULT_SUPER_USER_ID,
874 });
875 let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
876 map.insert(1_u32.into(), 0_u32.into());
877 map.insert(2_u32.into(), 0_u32.into());
878 map.insert(3_u32.into(), 0_u32.into());
879 Self {
880 catalog,
881 id: AtomicU32::new(3),
882 table_id_to_schema_id: Default::default(),
883 schema_id_to_database_id: RwLock::new(map),
884 hummock_snapshot_manager,
885 }
886 }
887
888 fn gen_id<T: From<u32>>(&self) -> T {
889 (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
891 }
892
893 fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
894 self.table_id_to_schema_id
895 .write()
896 .insert(table_id, schema_id);
897 }
898
899 fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
900 let schema_id = self
901 .table_id_to_schema_id
902 .write()
903 .remove(&table_id)
904 .unwrap();
905 (self.get_database_id_by_schema(schema_id), schema_id)
906 }
907
908 fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
909 self.table_id_to_schema_id
910 .write()
911 .insert(table_id, schema_id);
912 }
913
914 fn add_table_or_subscription_id(
915 &self,
916 table_id: u32,
917 schema_id: SchemaId,
918 _database_id: DatabaseId,
919 ) {
920 self.table_id_to_schema_id
921 .write()
922 .insert(table_id, schema_id);
923 }
924
925 fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
926 self.table_id_to_schema_id
927 .write()
928 .insert(table_id, schema_id);
929 }
930
931 fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
932 let schema_id = self
933 .table_id_to_schema_id
934 .write()
935 .remove(&table_id)
936 .unwrap();
937 (self.get_database_id_by_schema(schema_id), schema_id)
938 }
939
940 fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
941 let schema_id = self
942 .table_id_to_schema_id
943 .write()
944 .remove(&table_id)
945 .unwrap();
946 (self.get_database_id_by_schema(schema_id), schema_id)
947 }
948
949 fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
950 let schema_id = self
951 .table_id_to_schema_id
952 .write()
953 .remove(&table_id)
954 .unwrap();
955 (self.get_database_id_by_schema(schema_id), schema_id)
956 }
957
958 fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
959 self.schema_id_to_database_id
960 .write()
961 .insert(schema_id, database_id);
962 }
963
964 fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
965 self.schema_id_to_database_id
966 .write()
967 .remove(&schema_id)
968 .unwrap()
969 }
970
971 fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
972 source.id = self.gen_id();
973 self.catalog.write().create_source(&source);
974 self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
975 Ok(source.id)
976 }
977
978 fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<SinkId> {
979 sink.id = self.gen_id();
980 sink.stream_job_status = PbStreamJobStatus::Created as _;
981 self.catalog.write().create_sink(&sink);
982 self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
983 Ok(sink.id)
984 }
985
986 fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
987 subscription.id = self.gen_id();
988 self.catalog.write().create_subscription(&subscription);
989 self.add_table_or_subscription_id(
990 subscription.id.as_raw_id(),
991 subscription.schema_id,
992 subscription.database_id,
993 );
994 Ok(())
995 }
996
997 fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
998 *self
999 .schema_id_to_database_id
1000 .read()
1001 .get(&schema_id)
1002 .unwrap()
1003 }
1004
1005 fn get_object_type(&self, object_id: ObjectId) -> PbObjectType {
1006 let catalog = self.catalog.read();
1007 for database in catalog.iter_databases() {
1008 for schema in database.iter_schemas() {
1009 if let Some(table) = schema.get_created_table_by_id(object_id.as_table_id()) {
1010 return if table.is_mview() {
1011 PbObjectType::Mview
1012 } else {
1013 PbObjectType::Table
1014 };
1015 }
1016 if schema.get_source_by_id(object_id.as_source_id()).is_some() {
1017 return PbObjectType::Source;
1018 }
1019 if schema.get_view_by_id(object_id.as_view_id()).is_some() {
1020 return PbObjectType::View;
1021 }
1022 if schema.get_index_by_id(object_id.as_index_id()).is_some() {
1023 return PbObjectType::Index;
1024 }
1025 }
1026 }
1027 PbObjectType::Unspecified
1028 }
1029
1030 fn insert_object_dependencies(&self, object_id: ObjectId, dependencies: HashSet<ObjectId>) {
1031 if dependencies.is_empty() {
1032 return;
1033 }
1034 let dependencies = dependencies
1035 .into_iter()
1036 .map(|referenced_object_id| PbObjectDependency {
1037 object_id,
1038 referenced_object_id,
1039 referenced_object_type: self.get_object_type(referenced_object_id) as i32,
1040 })
1041 .collect();
1042 self.catalog
1043 .write()
1044 .insert_object_dependencies(dependencies);
1045 }
1046}
1047
1048pub struct MockUserInfoWriter {
1049 id: AtomicU32,
1050 user_info: Arc<RwLock<UserInfoManager>>,
1051}
1052
1053#[async_trait::async_trait]
1054impl UserInfoWriter for MockUserInfoWriter {
1055 async fn create_user(&self, user: UserInfo) -> Result<()> {
1056 let mut user = user;
1057 user.id = self.gen_id().into();
1058 self.user_info.write().create_user(user);
1059 Ok(())
1060 }
1061
1062 async fn drop_user(&self, id: UserId) -> Result<()> {
1063 self.user_info.write().drop_user(id);
1064 Ok(())
1065 }
1066
1067 async fn update_user(
1068 &self,
1069 update_user: UserInfo,
1070 update_fields: Vec<UpdateField>,
1071 ) -> Result<()> {
1072 let mut lock = self.user_info.write();
1073 let id = update_user.get_id();
1074 let Some(old_name) = lock.get_user_name_by_id(id) else {
1075 return Ok(());
1076 };
1077 let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
1078 update_fields.into_iter().for_each(|field| match field {
1079 UpdateField::Super => user_info.is_super = update_user.is_super,
1080 UpdateField::Login => user_info.can_login = update_user.can_login,
1081 UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
1082 UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
1083 UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
1084 UpdateField::Rename => user_info.name.clone_from(&update_user.name),
1085 UpdateField::Admin => user_info.is_admin = update_user.is_admin,
1086 UpdateField::Unspecified => unreachable!(),
1087 });
1088 lock.update_user(update_user);
1089 Ok(())
1090 }
1091
1092 async fn grant_privilege(
1095 &self,
1096 users: Vec<UserId>,
1097 privileges: Vec<GrantPrivilege>,
1098 with_grant_option: bool,
1099 _grantor: UserId,
1100 ) -> Result<()> {
1101 let privileges = privileges
1102 .into_iter()
1103 .map(|mut p| {
1104 p.action_with_opts
1105 .iter_mut()
1106 .for_each(|ao| ao.with_grant_option = with_grant_option);
1107 p
1108 })
1109 .collect::<Vec<_>>();
1110 for user_id in users {
1111 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1112 u.extend_privileges(privileges.clone());
1113 }
1114 }
1115 Ok(())
1116 }
1117
1118 async fn revoke_privilege(
1121 &self,
1122 users: Vec<UserId>,
1123 privileges: Vec<GrantPrivilege>,
1124 _granted_by: UserId,
1125 _revoke_by: UserId,
1126 revoke_grant_option: bool,
1127 _cascade: bool,
1128 ) -> Result<()> {
1129 for user_id in users {
1130 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1131 u.revoke_privileges(privileges.clone(), revoke_grant_option);
1132 }
1133 }
1134 Ok(())
1135 }
1136
1137 async fn alter_default_privilege(
1138 &self,
1139 _users: Vec<UserId>,
1140 _database_id: DatabaseId,
1141 _schemas: Vec<SchemaId>,
1142 _operation: AlterDefaultPrivilegeOperation,
1143 _operated_by: UserId,
1144 ) -> Result<()> {
1145 todo!()
1146 }
1147}
1148
1149impl MockUserInfoWriter {
1150 pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1151 user_info.write().create_user(UserInfo {
1152 id: DEFAULT_SUPER_USER_ID,
1153 name: DEFAULT_SUPER_USER.to_owned(),
1154 is_super: true,
1155 can_create_db: true,
1156 can_create_user: true,
1157 can_login: true,
1158 ..Default::default()
1159 });
1160 user_info.write().create_user(UserInfo {
1161 id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1162 name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1163 is_super: true,
1164 can_create_db: true,
1165 can_create_user: true,
1166 can_login: true,
1167 is_admin: true,
1168 ..Default::default()
1169 });
1170 Self {
1171 user_info,
1172 id: AtomicU32::new(NON_RESERVED_USER_ID.as_raw_id()),
1173 }
1174 }
1175
1176 fn gen_id(&self) -> u32 {
1177 self.id.fetch_add(1, Ordering::SeqCst)
1178 }
1179}
1180
1181pub struct MockFrontendMetaClient {}
1182
1183#[async_trait::async_trait]
1184impl FrontendMetaClient for MockFrontendMetaClient {
1185 async fn try_unregister(&self) {}
1186
1187 async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1188 Ok(INVALID_VERSION_ID)
1189 }
1190
1191 async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1192 Ok(vec![])
1193 }
1194
1195 async fn list_table_fragments(
1196 &self,
1197 _table_ids: &[JobId],
1198 ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1199 Ok(HashMap::default())
1200 }
1201
1202 async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1203 Ok(vec![])
1204 }
1205
1206 async fn list_fragment_distribution(
1207 &self,
1208 _include_node: bool,
1209 ) -> RpcResult<Vec<FragmentDistribution>> {
1210 Ok(vec![])
1211 }
1212
1213 async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1214 Ok(vec![])
1215 }
1216
1217 async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1218 Ok(vec![])
1219 }
1220
1221 async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1222 Ok(vec![])
1223 }
1224
1225 async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1226 Ok(vec![])
1227 }
1228
1229 async fn list_sink_log_store_tables(
1230 &self,
1231 ) -> RpcResult<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1232 Ok(vec![])
1233 }
1234
1235 async fn set_system_param(
1236 &self,
1237 _param: String,
1238 _value: Option<String>,
1239 ) -> RpcResult<Option<SystemParamsReader>> {
1240 Ok(Some(SystemParams::default().into()))
1241 }
1242
1243 async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1244 Ok(Default::default())
1245 }
1246
1247 async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1248 Ok("".to_owned())
1249 }
1250
1251 async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1252 Ok(vec![])
1253 }
1254
1255 async fn get_tables(
1256 &self,
1257 _table_ids: Vec<crate::catalog::TableId>,
1258 _include_dropped_tables: bool,
1259 ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1260 Ok(HashMap::new())
1261 }
1262
1263 async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, HummockVersionId)>> {
1264 unimplemented!()
1265 }
1266
1267 async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1268 unimplemented!()
1269 }
1270
1271 async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1272 Ok(HummockVersion::default())
1273 }
1274
1275 async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1276 unimplemented!()
1277 }
1278
1279 async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1280 unimplemented!()
1281 }
1282
1283 async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1284 unimplemented!()
1285 }
1286
1287 async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1288 unimplemented!()
1289 }
1290
1291 async fn list_hummock_active_write_limits(
1292 &self,
1293 ) -> RpcResult<HashMap<CompactionGroupId, WriteLimit>> {
1294 unimplemented!()
1295 }
1296
1297 async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1298 unimplemented!()
1299 }
1300
1301 async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1302 Ok(vec![])
1303 }
1304
1305 async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1306 unimplemented!()
1307 }
1308
1309 async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1310 Ok(vec![])
1311 }
1312
1313 async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1314 unimplemented!()
1315 }
1316
1317 async fn recover(&self) -> RpcResult<()> {
1318 unimplemented!()
1319 }
1320
1321 async fn backup_meta(&self, _remarks: Option<String>) -> RpcResult<u64> {
1322 unimplemented!()
1323 }
1324
1325 async fn get_backup_job_status(
1326 &self,
1327 _job_id: u64,
1328 ) -> RpcResult<(risingwave_pb::backup_service::BackupJobStatus, String)> {
1329 unimplemented!()
1330 }
1331
1332 async fn delete_meta_snapshot(&self, _snapshot_ids: &[u64]) -> RpcResult<()> {
1333 unimplemented!()
1334 }
1335
1336 async fn apply_throttle(
1337 &self,
1338 _throttle_target: PbThrottleTarget,
1339 _throttle_type: risingwave_pb::common::PbThrottleType,
1340 _id: u32,
1341 _rate_limit: Option<u32>,
1342 ) -> RpcResult<()> {
1343 unimplemented!()
1344 }
1345
1346 async fn alter_fragment_parallelism(
1347 &self,
1348 _fragment_ids: Vec<FragmentId>,
1349 _parallelism: Option<PbTableParallelism>,
1350 ) -> RpcResult<()> {
1351 unimplemented!()
1352 }
1353
1354 async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1355 Ok(RecoveryStatus::StatusRunning)
1356 }
1357
1358 async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1359 Ok(vec![])
1360 }
1361
1362 async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1363 Ok(vec![])
1364 }
1365
1366 async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1367 Ok(HashMap::default())
1368 }
1369
1370 async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1371 unimplemented!()
1372 }
1373
1374 async fn alter_sink_props(
1375 &self,
1376 _sink_id: SinkId,
1377 _changed_props: BTreeMap<String, String>,
1378 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1379 _connector_conn_ref: Option<ConnectionId>,
1380 ) -> RpcResult<()> {
1381 unimplemented!()
1382 }
1383
1384 async fn alter_iceberg_table_props(
1385 &self,
1386 _table_id: TableId,
1387 _sink_id: SinkId,
1388 _source_id: SourceId,
1389 _changed_props: BTreeMap<String, String>,
1390 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1391 _connector_conn_ref: Option<ConnectionId>,
1392 ) -> RpcResult<()> {
1393 unimplemented!()
1394 }
1395
1396 async fn alter_source_connector_props(
1397 &self,
1398 _source_id: SourceId,
1399 _changed_props: BTreeMap<String, String>,
1400 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1401 _connector_conn_ref: Option<ConnectionId>,
1402 ) -> RpcResult<()> {
1403 unimplemented!()
1404 }
1405
1406 async fn alter_connection_connector_props(
1407 &self,
1408 _connection_id: u32,
1409 _changed_props: BTreeMap<String, String>,
1410 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1411 ) -> RpcResult<()> {
1412 Ok(())
1413 }
1414
1415 async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1416 unimplemented!()
1417 }
1418
1419 async fn list_iceberg_compaction_status(&self) -> RpcResult<Vec<IcebergCompactionStatus>> {
1420 Ok(vec![])
1421 }
1422
1423 async fn get_fragment_by_id(
1424 &self,
1425 _fragment_id: FragmentId,
1426 ) -> RpcResult<Option<FragmentDistribution>> {
1427 unimplemented!()
1428 }
1429
1430 async fn get_fragment_vnodes(
1431 &self,
1432 _fragment_id: FragmentId,
1433 ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1434 unimplemented!()
1435 }
1436
1437 async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1438 unimplemented!()
1439 }
1440
1441 fn worker_id(&self) -> WorkerId {
1442 0.into()
1443 }
1444
1445 async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1446 Ok(())
1447 }
1448
1449 async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1450 Ok(1)
1451 }
1452
1453 async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1454 Ok(())
1455 }
1456
1457 async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1458 Ok(RefreshResponse { status: None })
1459 }
1460
1461 fn cluster_id(&self) -> &str {
1462 "test-cluster-uuid"
1463 }
1464
1465 async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1466 unimplemented!()
1467 }
1468
1469 async fn get_hummock_table_change_log(
1470 &self,
1471 _start_epoch_inclusive: Option<u64>,
1472 _end_epoch_inclusive: Option<u64>,
1473 _table_ids: Option<HashSet<TableId>>,
1474 _exclude_empty: bool,
1475 _limit: Option<u32>,
1476 ) -> RpcResult<TableChangeLogs> {
1477 Ok(HashMap::default())
1478 }
1479
1480 async fn update_compaction_config(
1481 &self,
1482 _compaction_group_ids: Vec<risingwave_hummock_sdk::CompactionGroupId>,
1483 _configs: Vec<PbMutableConfig>,
1484 ) -> RpcResult<()> {
1485 Ok(())
1486 }
1487}
1488
1489#[cfg(test)]
1490pub static PROTO_FILE_DATA: &str = r#"
1491 syntax = "proto3";
1492 package test;
1493 message TestRecord {
1494 int32 id = 1;
1495 Country country = 3;
1496 int64 zipcode = 4;
1497 float rate = 5;
1498 }
1499 message TestRecordAlterType {
1500 string id = 1;
1501 Country country = 3;
1502 int32 zipcode = 4;
1503 float rate = 5;
1504 }
1505 message TestRecordExt {
1506 int32 id = 1;
1507 Country country = 3;
1508 int64 zipcode = 4;
1509 float rate = 5;
1510 string name = 6;
1511 }
1512 message Country {
1513 string address = 1;
1514 City city = 2;
1515 string zipcode = 3;
1516 }
1517 message City {
1518 string address = 1;
1519 string zipcode = 2;
1520 }"#;
1521
1522pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1525 let in_file = Builder::new()
1526 .prefix("temp")
1527 .suffix(".proto")
1528 .rand_bytes(8)
1529 .tempfile()
1530 .unwrap();
1531
1532 let out_file = Builder::new()
1533 .prefix("temp")
1534 .suffix(".pb")
1535 .rand_bytes(8)
1536 .tempfile()
1537 .unwrap();
1538
1539 let mut file = in_file.as_file();
1540 file.write_all(proto_data.as_ref())
1541 .expect("writing binary to test file");
1542 file.flush().expect("flush temp file failed");
1543 let include_path = in_file
1544 .path()
1545 .parent()
1546 .unwrap()
1547 .to_string_lossy()
1548 .into_owned();
1549 let out_path = out_file.path().to_string_lossy().into_owned();
1550 let in_path = in_file.path().to_string_lossy().into_owned();
1551 let mut compile = std::process::Command::new("protoc");
1552
1553 let out = compile
1554 .arg("--include_imports")
1555 .arg("-I")
1556 .arg(include_path)
1557 .arg(format!("--descriptor_set_out={}", out_path))
1558 .arg(in_path)
1559 .output()
1560 .expect("failed to compile proto");
1561 if !out.status.success() {
1562 panic!("compile proto failed \n output: {:?}", out);
1563 }
1564 out_file
1565}