1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28 AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29 DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30 FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31 RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::AdaptiveParallelismStrategy;
37use risingwave_common::system_param::reader::SystemParamsReader;
38use risingwave_common::util::cluster_limit::ClusterLimit;
39use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
40use risingwave_hummock_sdk::change_log::TableChangeLogs;
41use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
42use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
43use risingwave_pb::backup_service::MetaSnapshotMetadata;
44use risingwave_pb::catalog::{
45 PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
46 PbSubscription, PbTable, PbView, Table,
47};
48use risingwave_pb::common::{PbObjectType, WorkerNode};
49use risingwave_pb::ddl_service::alter_owner_request::Object;
50use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
51use risingwave_pb::ddl_service::{
52 DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
53 alter_swap_rename_request, create_connection_request, streaming_job_resource_type,
54};
55use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig as PbMutableConfig;
56use risingwave_pb::hummock::write_limits::WriteLimit;
57use risingwave_pb::hummock::{
58 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
59};
60use risingwave_pb::id::ActorId;
61use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
62use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
63use risingwave_pb::meta::list_actor_states_response::ActorState;
64use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
65use risingwave_pb::meta::list_iceberg_compaction_status_response::IcebergCompactionStatus;
66use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
67use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
68use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
69use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
70use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
71use risingwave_pb::meta::{
72 EventLog, FragmentDistribution, ObjectDependency as PbObjectDependency, PbTableParallelism,
73 PbThrottleTarget, RecoveryStatus, RefreshRequest, RefreshResponse, SystemParams,
74 list_sink_log_store_tables_response,
75};
76use risingwave_pb::secret::PbSecretRef;
77use risingwave_pb::stream_plan::StreamFragmentGraph;
78use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
79use risingwave_pb::user::update_user_request::UpdateField;
80use risingwave_pb::user::{GrantPrivilege, UserInfo};
81use risingwave_rpc_client::error::Result as RpcResult;
82use tempfile::{Builder, NamedTempFile};
83
84use crate::FrontendOpts;
85use crate::catalog::catalog_service::CatalogWriter;
86use crate::catalog::root_catalog::Catalog;
87use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
88use crate::error::{ErrorCode, Result, RwError};
89use crate::handler::RwPgResponse;
90use crate::meta_client::FrontendMetaClient;
91use crate::scheduler::HummockSnapshotManagerRef;
92use crate::session::{AuthContext, FrontendEnv, SessionImpl};
93use crate::user::UserId;
94use crate::user::user_manager::UserInfoManager;
95use crate::user::user_service::UserInfoWriter;
96
97pub struct LocalFrontend {
99 pub opts: FrontendOpts,
100 env: FrontendEnv,
101}
102
103impl SessionManager for LocalFrontend {
104 type Error = RwError;
105 type Session = SessionImpl;
106
107 fn create_dummy_session(
108 &self,
109 _database_id: DatabaseId,
110 ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
111 unreachable!()
112 }
113
114 fn connect(
115 &self,
116 _database: &str,
117 _user_name: &str,
118 _peer_addr: AddressRef,
119 ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
120 Ok(self.session_ref())
121 }
122
123 fn cancel_queries_in_session(&self, _session_id: SessionId) {
124 unreachable!()
125 }
126
127 fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
128 unreachable!()
129 }
130
131 fn end_session(&self, _session: &Self::Session) {
132 unreachable!()
133 }
134}
135
136impl LocalFrontend {
137 #[expect(clippy::unused_async)]
138 pub async fn new(opts: FrontendOpts) -> Self {
139 let env = FrontendEnv::mock();
140 Self { opts, env }
141 }
142
143 pub async fn run_sql(
144 &self,
145 sql: impl Into<String>,
146 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
147 let sql: Arc<str> = Arc::from(sql.into());
148 Box::pin(self.session_ref().run_statement(sql, vec![])).await
149 }
150
151 pub async fn run_sql_with_session(
152 &self,
153 session_ref: Arc<SessionImpl>,
154 sql: impl Into<String>,
155 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
156 let sql: Arc<str> = Arc::from(sql.into());
157 Box::pin(session_ref.run_statement(sql, vec![])).await
158 }
159
160 pub async fn run_user_sql(
161 &self,
162 sql: impl Into<String>,
163 database: String,
164 user_name: String,
165 user_id: UserId,
166 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
167 let sql: Arc<str> = Arc::from(sql.into());
168 Box::pin(
169 self.session_user_ref(database, user_name, user_id)
170 .run_statement(sql, vec![]),
171 )
172 .await
173 }
174
175 pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
176 let mut rsp = self.run_sql(sql).await.unwrap();
177 let mut res = vec![];
178 #[for_await]
179 for row_set in rsp.values_stream() {
180 for row in row_set.unwrap() {
181 res.push(format!("{:?}", row));
182 }
183 }
184 res
185 }
186
187 pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
188 let mut rsp = self.run_sql(sql).await.unwrap();
189 assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
190 let mut res = String::new();
191 #[for_await]
192 for row_set in rsp.values_stream() {
193 for row in row_set.unwrap() {
194 let row: Row = row;
195 let row = row.values()[0].as_ref().unwrap();
196 res += std::str::from_utf8(row).unwrap();
197 res += "\n";
198 }
199 }
200 res
201 }
202
203 pub fn session_ref(&self) -> Arc<SessionImpl> {
205 self.session_user_ref(
206 DEFAULT_DATABASE_NAME.to_owned(),
207 DEFAULT_SUPER_USER.to_owned(),
208 DEFAULT_SUPER_USER_ID,
209 )
210 }
211
212 pub fn session_user_ref(
213 &self,
214 database: String,
215 user_name: String,
216 user_id: UserId,
217 ) -> Arc<SessionImpl> {
218 Arc::new(SessionImpl::new(
219 self.env.clone(),
220 AuthContext::new(database, user_name, user_id),
221 UserAuthenticator::None,
222 (0, 0),
224 Address::Tcp(SocketAddr::new(
225 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
226 6666,
227 ))
228 .into(),
229 Default::default(),
230 ))
231 }
232}
233
234pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
235 if rsp.stmt_type() != StatementType::EXPLAIN {
236 panic!("RESPONSE INVALID: {rsp:?}");
237 }
238 let mut res = String::new();
239 #[for_await]
240 for row_set in rsp.values_stream() {
241 for row in row_set.unwrap() {
242 let row: Row = row;
243 let row = row.values()[0].as_ref().unwrap();
244 res += std::str::from_utf8(row).unwrap();
245 res += "\n";
246 }
247 }
248 res
249}
250
251pub struct MockCatalogWriter {
252 catalog: Arc<RwLock<Catalog>>,
253 id: AtomicU32,
254 table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
255 schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
256 hummock_snapshot_manager: HummockSnapshotManagerRef,
257}
258
259#[async_trait::async_trait]
260impl CatalogWriter for MockCatalogWriter {
261 async fn create_database(
262 &self,
263 db_name: &str,
264 owner: UserId,
265 resource_group: &str,
266 barrier_interval_ms: Option<u32>,
267 checkpoint_frequency: Option<u64>,
268 ) -> Result<()> {
269 let database_id = DatabaseId::new(self.gen_id());
270 self.catalog.write().create_database(&PbDatabase {
271 name: db_name.to_owned(),
272 id: database_id,
273 owner,
274 resource_group: resource_group.to_owned(),
275 barrier_interval_ms,
276 checkpoint_frequency,
277 });
278 self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
279 .await?;
280 self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
281 .await?;
282 self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
283 .await?;
284 Ok(())
285 }
286
287 async fn create_schema(
288 &self,
289 db_id: DatabaseId,
290 schema_name: &str,
291 owner: UserId,
292 ) -> Result<()> {
293 let id = self.gen_id();
294 self.catalog.write().create_schema(&PbSchema {
295 id,
296 name: schema_name.to_owned(),
297 database_id: db_id,
298 owner,
299 });
300 self.add_schema_id(id, db_id);
301 Ok(())
302 }
303
304 async fn create_materialized_view(
305 &self,
306 mut table: PbTable,
307 _graph: StreamFragmentGraph,
308 dependencies: HashSet<ObjectId>,
309 _resource_type: streaming_job_resource_type::ResourceType,
310 _if_not_exists: bool,
311 _refresh_interval_sec: Option<u64>,
312 ) -> Result<()> {
313 table.id = self.gen_id();
314 table.stream_job_status = PbStreamJobStatus::Created as _;
315 table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
316 self.catalog.write().create_table(&table);
317 self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
318 self.insert_object_dependencies(table.id.as_object_id(), dependencies);
319 self.hummock_snapshot_manager.add_table_for_test(table.id);
320 Ok(())
321 }
322
323 async fn replace_materialized_view(
324 &self,
325 mut table: PbTable,
326 _graph: StreamFragmentGraph,
327 ) -> Result<()> {
328 table.stream_job_status = PbStreamJobStatus::Created as _;
329 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
330 self.catalog.write().update_table(&table);
331 Ok(())
332 }
333
334 async fn create_view(&self, mut view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
335 view.id = self.gen_id();
336 self.catalog.write().create_view(&view);
337 self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
338 self.insert_object_dependencies(view.id.as_object_id(), dependencies);
339 Ok(())
340 }
341
342 async fn create_table(
343 &self,
344 source: Option<PbSource>,
345 mut table: PbTable,
346 graph: StreamFragmentGraph,
347 _job_type: PbTableJobType,
348 if_not_exists: bool,
349 dependencies: HashSet<ObjectId>,
350 ) -> Result<()> {
351 if let Some(source) = source {
352 let source_id = self.create_source_inner(source)?;
353 table.optional_associated_source_id = Some(source_id.into());
354 }
355 self.create_materialized_view(
356 table,
357 graph,
358 dependencies,
359 streaming_job_resource_type::ResourceType::Regular(true),
360 if_not_exists,
361 None,
362 )
363 .await?;
364 Ok(())
365 }
366
367 async fn replace_table(
368 &self,
369 _source: Option<PbSource>,
370 mut table: PbTable,
371 _graph: StreamFragmentGraph,
372 _job_type: TableJobType,
373 ) -> Result<()> {
374 table.stream_job_status = PbStreamJobStatus::Created as _;
375 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
376 self.catalog.write().update_table(&table);
377 Ok(())
378 }
379
380 async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
381 self.catalog.write().update_source(&source);
382 Ok(())
383 }
384
385 async fn create_source(
386 &self,
387 source: PbSource,
388 _graph: Option<StreamFragmentGraph>,
389 _if_not_exists: bool,
390 ) -> Result<()> {
391 self.create_source_inner(source).map(|_| ())
392 }
393
394 async fn create_sink(
395 &self,
396 sink: PbSink,
397 graph: StreamFragmentGraph,
398 dependencies: HashSet<ObjectId>,
399 _resource_type: streaming_job_resource_type::ResourceType,
400 _if_not_exists: bool,
401 ) -> Result<()> {
402 let sink_id = self.create_sink_inner(sink, graph)?;
403 self.insert_object_dependencies(sink_id.as_object_id(), dependencies);
404 Ok(())
405 }
406
407 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
408 self.create_subscription_inner(subscription)
409 }
410
411 async fn create_index(
412 &self,
413 mut index: PbIndex,
414 mut index_table: PbTable,
415 _graph: StreamFragmentGraph,
416 _resource_type: streaming_job_resource_type::ResourceType,
417 _if_not_exists: bool,
418 ) -> Result<()> {
419 index_table.id = self.gen_id();
420 index_table.stream_job_status = PbStreamJobStatus::Created as _;
421 index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
422 self.catalog.write().create_table(&index_table);
423 self.add_table_or_index_id(
424 index_table.id.as_raw_id(),
425 index_table.schema_id,
426 index_table.database_id,
427 );
428
429 index.id = index_table.id.as_raw_id().into();
430 index.index_table_id = index_table.id;
431 self.catalog.write().create_index(&index);
432 Ok(())
433 }
434
435 async fn create_function(&self, _function: PbFunction) -> Result<()> {
436 unreachable!()
437 }
438
439 async fn create_connection(
440 &self,
441 _connection_name: String,
442 _database_id: DatabaseId,
443 _schema_id: SchemaId,
444 _owner_id: UserId,
445 _connection: create_connection_request::Payload,
446 ) -> Result<()> {
447 unreachable!()
448 }
449
450 async fn create_secret(
451 &self,
452 _secret_name: String,
453 _database_id: DatabaseId,
454 _schema_id: SchemaId,
455 _owner_id: UserId,
456 _payload: Vec<u8>,
457 ) -> Result<()> {
458 unreachable!()
459 }
460
461 async fn comment_on(&self, _comment: PbComment) -> Result<()> {
462 unreachable!()
463 }
464
465 async fn drop_table(
466 &self,
467 source_id: Option<SourceId>,
468 table_id: TableId,
469 cascade: bool,
470 ) -> Result<()> {
471 if cascade {
472 return Err(ErrorCode::NotSupported(
473 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
474 "use drop instead".to_owned(),
475 )
476 .into());
477 }
478 if let Some(source_id) = source_id {
479 self.drop_table_or_source_id(source_id.as_raw_id());
480 }
481 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
482 let indexes =
483 self.catalog
484 .read()
485 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
486 for index in indexes {
487 self.drop_index(index.id, cascade).await?;
488 }
489 self.catalog
490 .write()
491 .drop_table(database_id, schema_id, table_id);
492 if let Some(source_id) = source_id {
493 self.catalog
494 .write()
495 .drop_source(database_id, schema_id, source_id);
496 }
497 Ok(())
498 }
499
500 async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
501 unreachable!()
502 }
503
504 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
505 if cascade {
506 return Err(ErrorCode::NotSupported(
507 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
508 "use drop instead".to_owned(),
509 )
510 .into());
511 }
512 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
513 let indexes =
514 self.catalog
515 .read()
516 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
517 for index in indexes {
518 self.drop_index(index.id, cascade).await?;
519 }
520 self.catalog
521 .write()
522 .drop_table(database_id, schema_id, table_id);
523 Ok(())
524 }
525
526 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
527 if cascade {
528 return Err(ErrorCode::NotSupported(
529 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
530 "use drop instead".to_owned(),
531 )
532 .into());
533 }
534 let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
535 self.catalog
536 .write()
537 .drop_source(database_id, schema_id, source_id);
538 Ok(())
539 }
540
541 async fn reset_source(&self, _source_id: SourceId) -> Result<()> {
542 Ok(())
543 }
544
545 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
546 if cascade {
547 return Err(ErrorCode::NotSupported(
548 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
549 "use drop instead".to_owned(),
550 )
551 .into());
552 }
553 let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
554 self.catalog
555 .write()
556 .drop_sink(database_id, schema_id, sink_id);
557 Ok(())
558 }
559
560 async fn drop_subscription(
561 &self,
562 subscription_id: SubscriptionId,
563 cascade: bool,
564 ) -> Result<()> {
565 if cascade {
566 return Err(ErrorCode::NotSupported(
567 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
568 "use drop instead".to_owned(),
569 )
570 .into());
571 }
572 let (database_id, schema_id) =
573 self.drop_table_or_subscription_id(subscription_id.as_raw_id());
574 self.catalog
575 .write()
576 .drop_subscription(database_id, schema_id, subscription_id);
577 Ok(())
578 }
579
580 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
581 if cascade {
582 return Err(ErrorCode::NotSupported(
583 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
584 "use drop instead".to_owned(),
585 )
586 .into());
587 }
588 let &schema_id = self
589 .table_id_to_schema_id
590 .read()
591 .get(&index_id.as_raw_id())
592 .unwrap();
593 let database_id = self.get_database_id_by_schema(schema_id);
594
595 let index = {
596 let catalog_reader = self.catalog.read();
597 let schema_catalog = catalog_reader
598 .get_schema_by_id(database_id, schema_id)
599 .unwrap();
600 schema_catalog.get_index_by_id(index_id).unwrap().clone()
601 };
602
603 let index_table_id = index.index_table().id;
604 let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
605 self.catalog
606 .write()
607 .drop_index(database_id, schema_id, index_id);
608 self.catalog
609 .write()
610 .drop_table(database_id, schema_id, index_table_id);
611 Ok(())
612 }
613
614 async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
615 unreachable!()
616 }
617
618 async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
619 unreachable!()
620 }
621
622 async fn drop_secret(&self, _secret_id: SecretId, _cascade: bool) -> Result<()> {
623 unreachable!()
624 }
625
626 async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
627 self.catalog.write().drop_database(database_id);
628 Ok(())
629 }
630
631 async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
632 let database_id = self.drop_schema_id(schema_id);
633 self.catalog.write().drop_schema(database_id, schema_id);
634 Ok(())
635 }
636
637 async fn alter_name(
638 &self,
639 object_id: alter_name_request::Object,
640 object_name: &str,
641 ) -> Result<()> {
642 match object_id {
643 alter_name_request::Object::TableId(table_id) => {
644 self.catalog
645 .write()
646 .alter_table_name_by_id(table_id, object_name);
647 Ok(())
648 }
649 _ => {
650 unimplemented!()
651 }
652 }
653 }
654
655 async fn alter_source(&self, source: PbSource) -> Result<()> {
656 self.catalog.write().update_source(&source);
657 Ok(())
658 }
659
660 async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<()> {
661 for database in self.catalog.read().iter_databases() {
662 for schema in database.iter_schemas() {
663 match object {
664 Object::TableId(table_id) => {
665 if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
666 {
667 let mut pb_table = table.to_prost();
668 pb_table.owner = owner_id;
669 self.catalog.write().update_table(&pb_table);
670 return Ok(());
671 }
672 }
673 _ => unreachable!(),
674 }
675 }
676 }
677
678 Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
679 }
680
681 async fn alter_subscription_retention(
682 &self,
683 subscription_id: SubscriptionId,
684 retention_seconds: u64,
685 definition: String,
686 ) -> Result<()> {
687 for database in self.catalog.read().iter_databases() {
688 for schema in database.iter_schemas() {
689 if let Some(subscription) = schema.get_subscription_by_id(subscription_id) {
690 let mut pb_subscription = subscription.to_proto();
691 pb_subscription.retention_seconds = retention_seconds;
692 pb_subscription.definition = definition;
693 self.catalog.write().update_subscription(&pb_subscription);
694 return Ok(());
695 }
696 }
697 }
698
699 Err(
700 ErrorCode::ItemNotFound(format!("subscription not found: {:?}", subscription_id))
701 .into(),
702 )
703 }
704
705 async fn alter_set_schema(
706 &self,
707 object: alter_set_schema_request::Object,
708 new_schema_id: SchemaId,
709 ) -> Result<()> {
710 match object {
711 alter_set_schema_request::Object::TableId(table_id) => {
712 let mut pb_table = {
713 let reader = self.catalog.read();
714 let table = reader.get_any_table_by_id(table_id)?.to_owned();
715 table.to_prost()
716 };
717 pb_table.schema_id = new_schema_id;
718 self.catalog.write().update_table(&pb_table);
719 self.table_id_to_schema_id
720 .write()
721 .insert(table_id.as_raw_id(), new_schema_id);
722 Ok(())
723 }
724 _ => unreachable!(),
725 }
726 }
727
728 async fn alter_parallelism(
729 &self,
730 _job_id: JobId,
731 _parallelism: PbTableParallelism,
732 _adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
733 _deferred: bool,
734 ) -> Result<()> {
735 todo!()
736 }
737
738 async fn alter_backfill_parallelism(
739 &self,
740 _job_id: JobId,
741 _parallelism: Option<PbTableParallelism>,
742 _adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
743 _deferred: bool,
744 ) -> Result<()> {
745 todo!()
746 }
747
748 async fn alter_config(
749 &self,
750 _job_id: JobId,
751 _entries_to_add: HashMap<String, String>,
752 _keys_to_remove: Vec<String>,
753 ) -> Result<()> {
754 todo!()
755 }
756
757 async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
758 todo!()
759 }
760
761 async fn alter_secret(
762 &self,
763 _secret_id: SecretId,
764 _secret_name: String,
765 _database_id: DatabaseId,
766 _schema_id: SchemaId,
767 _owner_id: UserId,
768 _payload: Vec<u8>,
769 ) -> Result<()> {
770 unreachable!()
771 }
772
773 async fn alter_resource_group(
774 &self,
775 _job_id: JobId,
776 _resource_group: Option<String>,
777 _deferred: bool,
778 ) -> Result<()> {
779 todo!()
780 }
781
782 async fn alter_database_resource_group(
783 &self,
784 database_id: DatabaseId,
785 resource_group: Option<String>,
786 _deferred: bool,
787 ) -> Result<()> {
788 let mut pb_database = {
789 let reader = self.catalog.read();
790 let database = reader.get_database_by_id(database_id)?.to_owned();
791 database.to_prost()
792 };
793 pb_database.resource_group =
794 resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned());
795 self.catalog.write().update_database(&pb_database);
796 Ok(())
797 }
798
799 async fn alter_database_param(
800 &self,
801 database_id: DatabaseId,
802 param: AlterDatabaseParam,
803 ) -> Result<()> {
804 let mut pb_database = {
805 let reader = self.catalog.read();
806 let database = reader.get_database_by_id(database_id)?.to_owned();
807 database.to_prost()
808 };
809 match param {
810 AlterDatabaseParam::BarrierIntervalMs(interval) => {
811 pb_database.barrier_interval_ms = interval;
812 }
813 AlterDatabaseParam::CheckpointFrequency(frequency) => {
814 pb_database.checkpoint_frequency = frequency;
815 }
816 }
817 self.catalog.write().update_database(&pb_database);
818 Ok(())
819 }
820
821 async fn create_iceberg_table(
822 &self,
823 _table_job_info: PbTableJobInfo,
824 _sink_job_info: PbSinkJobInfo,
825 _iceberg_source: PbSource,
826 _if_not_exists: bool,
827 ) -> Result<()> {
828 todo!()
829 }
830
831 async fn wait(&self, _job_id: Option<JobId>) -> Result<()> {
832 Ok(())
833 }
834}
835
836impl MockCatalogWriter {
837 pub fn new(
838 catalog: Arc<RwLock<Catalog>>,
839 hummock_snapshot_manager: HummockSnapshotManagerRef,
840 ) -> Self {
841 catalog.write().create_database(&PbDatabase {
842 id: 0.into(),
843 name: DEFAULT_DATABASE_NAME.to_owned(),
844 owner: DEFAULT_SUPER_USER_ID,
845 resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
846 barrier_interval_ms: None,
847 checkpoint_frequency: None,
848 });
849 catalog.write().create_schema(&PbSchema {
850 id: 1.into(),
851 name: DEFAULT_SCHEMA_NAME.to_owned(),
852 database_id: 0.into(),
853 owner: DEFAULT_SUPER_USER_ID,
854 });
855 catalog.write().create_schema(&PbSchema {
856 id: 2.into(),
857 name: PG_CATALOG_SCHEMA_NAME.to_owned(),
858 database_id: 0.into(),
859 owner: DEFAULT_SUPER_USER_ID,
860 });
861 catalog.write().create_schema(&PbSchema {
862 id: 3.into(),
863 name: RW_CATALOG_SCHEMA_NAME.to_owned(),
864 database_id: 0.into(),
865 owner: DEFAULT_SUPER_USER_ID,
866 });
867 let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
868 map.insert(1_u32.into(), 0_u32.into());
869 map.insert(2_u32.into(), 0_u32.into());
870 map.insert(3_u32.into(), 0_u32.into());
871 Self {
872 catalog,
873 id: AtomicU32::new(3),
874 table_id_to_schema_id: Default::default(),
875 schema_id_to_database_id: RwLock::new(map),
876 hummock_snapshot_manager,
877 }
878 }
879
880 fn gen_id<T: From<u32>>(&self) -> T {
881 (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
883 }
884
885 fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
886 self.table_id_to_schema_id
887 .write()
888 .insert(table_id, schema_id);
889 }
890
891 fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
892 let schema_id = self
893 .table_id_to_schema_id
894 .write()
895 .remove(&table_id)
896 .unwrap();
897 (self.get_database_id_by_schema(schema_id), schema_id)
898 }
899
900 fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
901 self.table_id_to_schema_id
902 .write()
903 .insert(table_id, schema_id);
904 }
905
906 fn add_table_or_subscription_id(
907 &self,
908 table_id: u32,
909 schema_id: SchemaId,
910 _database_id: DatabaseId,
911 ) {
912 self.table_id_to_schema_id
913 .write()
914 .insert(table_id, schema_id);
915 }
916
917 fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
918 self.table_id_to_schema_id
919 .write()
920 .insert(table_id, schema_id);
921 }
922
923 fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
924 let schema_id = self
925 .table_id_to_schema_id
926 .write()
927 .remove(&table_id)
928 .unwrap();
929 (self.get_database_id_by_schema(schema_id), schema_id)
930 }
931
932 fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
933 let schema_id = self
934 .table_id_to_schema_id
935 .write()
936 .remove(&table_id)
937 .unwrap();
938 (self.get_database_id_by_schema(schema_id), schema_id)
939 }
940
941 fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
942 let schema_id = self
943 .table_id_to_schema_id
944 .write()
945 .remove(&table_id)
946 .unwrap();
947 (self.get_database_id_by_schema(schema_id), schema_id)
948 }
949
950 fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
951 self.schema_id_to_database_id
952 .write()
953 .insert(schema_id, database_id);
954 }
955
956 fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
957 self.schema_id_to_database_id
958 .write()
959 .remove(&schema_id)
960 .unwrap()
961 }
962
963 fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
964 source.id = self.gen_id();
965 self.catalog.write().create_source(&source);
966 self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
967 Ok(source.id)
968 }
969
970 fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<SinkId> {
971 sink.id = self.gen_id();
972 sink.stream_job_status = PbStreamJobStatus::Created as _;
973 self.catalog.write().create_sink(&sink);
974 self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
975 Ok(sink.id)
976 }
977
978 fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
979 subscription.id = self.gen_id();
980 self.catalog.write().create_subscription(&subscription);
981 self.add_table_or_subscription_id(
982 subscription.id.as_raw_id(),
983 subscription.schema_id,
984 subscription.database_id,
985 );
986 Ok(())
987 }
988
989 fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
990 *self
991 .schema_id_to_database_id
992 .read()
993 .get(&schema_id)
994 .unwrap()
995 }
996
997 fn get_object_type(&self, object_id: ObjectId) -> PbObjectType {
998 let catalog = self.catalog.read();
999 for database in catalog.iter_databases() {
1000 for schema in database.iter_schemas() {
1001 if let Some(table) = schema.get_created_table_by_id(object_id.as_table_id()) {
1002 return if table.is_mview() {
1003 PbObjectType::Mview
1004 } else {
1005 PbObjectType::Table
1006 };
1007 }
1008 if schema.get_source_by_id(object_id.as_source_id()).is_some() {
1009 return PbObjectType::Source;
1010 }
1011 if schema.get_view_by_id(object_id.as_view_id()).is_some() {
1012 return PbObjectType::View;
1013 }
1014 if schema.get_index_by_id(object_id.as_index_id()).is_some() {
1015 return PbObjectType::Index;
1016 }
1017 }
1018 }
1019 PbObjectType::Unspecified
1020 }
1021
1022 fn insert_object_dependencies(&self, object_id: ObjectId, dependencies: HashSet<ObjectId>) {
1023 if dependencies.is_empty() {
1024 return;
1025 }
1026 let dependencies = dependencies
1027 .into_iter()
1028 .map(|referenced_object_id| PbObjectDependency {
1029 object_id,
1030 referenced_object_id,
1031 referenced_object_type: self.get_object_type(referenced_object_id) as i32,
1032 })
1033 .collect();
1034 self.catalog
1035 .write()
1036 .insert_object_dependencies(dependencies);
1037 }
1038}
1039
1040pub struct MockUserInfoWriter {
1041 id: AtomicU32,
1042 user_info: Arc<RwLock<UserInfoManager>>,
1043}
1044
1045#[async_trait::async_trait]
1046impl UserInfoWriter for MockUserInfoWriter {
1047 async fn create_user(&self, user: UserInfo) -> Result<()> {
1048 let mut user = user;
1049 user.id = self.gen_id().into();
1050 self.user_info.write().create_user(user);
1051 Ok(())
1052 }
1053
1054 async fn drop_user(&self, id: UserId) -> Result<()> {
1055 self.user_info.write().drop_user(id);
1056 Ok(())
1057 }
1058
1059 async fn update_user(
1060 &self,
1061 update_user: UserInfo,
1062 update_fields: Vec<UpdateField>,
1063 ) -> Result<()> {
1064 let mut lock = self.user_info.write();
1065 let id = update_user.get_id();
1066 let Some(old_name) = lock.get_user_name_by_id(id) else {
1067 return Ok(());
1068 };
1069 let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
1070 update_fields.into_iter().for_each(|field| match field {
1071 UpdateField::Super => user_info.is_super = update_user.is_super,
1072 UpdateField::Login => user_info.can_login = update_user.can_login,
1073 UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
1074 UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
1075 UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
1076 UpdateField::Rename => user_info.name.clone_from(&update_user.name),
1077 UpdateField::Admin => user_info.is_admin = update_user.is_admin,
1078 UpdateField::Unspecified => unreachable!(),
1079 });
1080 lock.update_user(update_user);
1081 Ok(())
1082 }
1083
1084 async fn grant_privilege(
1087 &self,
1088 users: Vec<UserId>,
1089 privileges: Vec<GrantPrivilege>,
1090 with_grant_option: bool,
1091 _grantor: UserId,
1092 ) -> Result<()> {
1093 let privileges = privileges
1094 .into_iter()
1095 .map(|mut p| {
1096 p.action_with_opts
1097 .iter_mut()
1098 .for_each(|ao| ao.with_grant_option = with_grant_option);
1099 p
1100 })
1101 .collect::<Vec<_>>();
1102 for user_id in users {
1103 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1104 u.extend_privileges(privileges.clone());
1105 }
1106 }
1107 Ok(())
1108 }
1109
1110 async fn revoke_privilege(
1113 &self,
1114 users: Vec<UserId>,
1115 privileges: Vec<GrantPrivilege>,
1116 _granted_by: UserId,
1117 _revoke_by: UserId,
1118 revoke_grant_option: bool,
1119 _cascade: bool,
1120 ) -> Result<()> {
1121 for user_id in users {
1122 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1123 u.revoke_privileges(privileges.clone(), revoke_grant_option);
1124 }
1125 }
1126 Ok(())
1127 }
1128
1129 async fn alter_default_privilege(
1130 &self,
1131 _users: Vec<UserId>,
1132 _database_id: DatabaseId,
1133 _schemas: Vec<SchemaId>,
1134 _operation: AlterDefaultPrivilegeOperation,
1135 _operated_by: UserId,
1136 ) -> Result<()> {
1137 todo!()
1138 }
1139}
1140
1141impl MockUserInfoWriter {
1142 pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1143 user_info.write().create_user(UserInfo {
1144 id: DEFAULT_SUPER_USER_ID,
1145 name: DEFAULT_SUPER_USER.to_owned(),
1146 is_super: true,
1147 can_create_db: true,
1148 can_create_user: true,
1149 can_login: true,
1150 ..Default::default()
1151 });
1152 user_info.write().create_user(UserInfo {
1153 id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1154 name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1155 is_super: true,
1156 can_create_db: true,
1157 can_create_user: true,
1158 can_login: true,
1159 is_admin: true,
1160 ..Default::default()
1161 });
1162 Self {
1163 user_info,
1164 id: AtomicU32::new(NON_RESERVED_USER_ID.as_raw_id()),
1165 }
1166 }
1167
1168 fn gen_id(&self) -> u32 {
1169 self.id.fetch_add(1, Ordering::SeqCst)
1170 }
1171}
1172
1173pub struct MockFrontendMetaClient {}
1174
1175#[async_trait::async_trait]
1176impl FrontendMetaClient for MockFrontendMetaClient {
1177 async fn try_unregister(&self) {}
1178
1179 async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1180 Ok(INVALID_VERSION_ID)
1181 }
1182
1183 async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1184 Ok(vec![])
1185 }
1186
1187 async fn list_table_fragments(
1188 &self,
1189 _table_ids: &[JobId],
1190 ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1191 Ok(HashMap::default())
1192 }
1193
1194 async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1195 Ok(vec![])
1196 }
1197
1198 async fn list_fragment_distribution(
1199 &self,
1200 _include_node: bool,
1201 ) -> RpcResult<Vec<FragmentDistribution>> {
1202 Ok(vec![])
1203 }
1204
1205 async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1206 Ok(vec![])
1207 }
1208
1209 async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1210 Ok(vec![])
1211 }
1212
1213 async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1214 Ok(vec![])
1215 }
1216
1217 async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1218 Ok(vec![])
1219 }
1220
1221 async fn list_sink_log_store_tables(
1222 &self,
1223 ) -> RpcResult<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1224 Ok(vec![])
1225 }
1226
1227 async fn set_system_param(
1228 &self,
1229 _param: String,
1230 _value: Option<String>,
1231 ) -> RpcResult<Option<SystemParamsReader>> {
1232 Ok(Some(SystemParams::default().into()))
1233 }
1234
1235 async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1236 Ok(Default::default())
1237 }
1238
1239 async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1240 Ok("".to_owned())
1241 }
1242
1243 async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1244 Ok(vec![])
1245 }
1246
1247 async fn get_tables(
1248 &self,
1249 _table_ids: Vec<crate::catalog::TableId>,
1250 _include_dropped_tables: bool,
1251 ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1252 Ok(HashMap::new())
1253 }
1254
1255 async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, HummockVersionId)>> {
1256 unimplemented!()
1257 }
1258
1259 async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1260 unimplemented!()
1261 }
1262
1263 async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1264 Ok(HummockVersion::default())
1265 }
1266
1267 async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1268 unimplemented!()
1269 }
1270
1271 async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1272 unimplemented!()
1273 }
1274
1275 async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1276 unimplemented!()
1277 }
1278
1279 async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1280 unimplemented!()
1281 }
1282
1283 async fn list_hummock_active_write_limits(
1284 &self,
1285 ) -> RpcResult<HashMap<CompactionGroupId, WriteLimit>> {
1286 unimplemented!()
1287 }
1288
1289 async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1290 unimplemented!()
1291 }
1292
1293 async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1294 Ok(vec![])
1295 }
1296
1297 async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1298 unimplemented!()
1299 }
1300
1301 async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1302 Ok(vec![])
1303 }
1304
1305 async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1306 unimplemented!()
1307 }
1308
1309 async fn recover(&self) -> RpcResult<()> {
1310 unimplemented!()
1311 }
1312
1313 async fn backup_meta(&self, _remarks: Option<String>) -> RpcResult<u64> {
1314 unimplemented!()
1315 }
1316
1317 async fn get_backup_job_status(
1318 &self,
1319 _job_id: u64,
1320 ) -> RpcResult<(risingwave_pb::backup_service::BackupJobStatus, String)> {
1321 unimplemented!()
1322 }
1323
1324 async fn delete_meta_snapshot(&self, _snapshot_ids: &[u64]) -> RpcResult<()> {
1325 unimplemented!()
1326 }
1327
1328 async fn apply_throttle(
1329 &self,
1330 _throttle_target: PbThrottleTarget,
1331 _throttle_type: risingwave_pb::common::PbThrottleType,
1332 _id: u32,
1333 _rate_limit: Option<u32>,
1334 ) -> RpcResult<()> {
1335 unimplemented!()
1336 }
1337
1338 async fn alter_fragment_parallelism(
1339 &self,
1340 _fragment_ids: Vec<FragmentId>,
1341 _parallelism: Option<PbTableParallelism>,
1342 ) -> RpcResult<()> {
1343 unimplemented!()
1344 }
1345
1346 async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1347 Ok(RecoveryStatus::StatusRunning)
1348 }
1349
1350 async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1351 Ok(vec![])
1352 }
1353
1354 async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1355 Ok(vec![])
1356 }
1357
1358 async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1359 Ok(HashMap::default())
1360 }
1361
1362 async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1363 unimplemented!()
1364 }
1365
1366 async fn alter_sink_props(
1367 &self,
1368 _sink_id: SinkId,
1369 _changed_props: BTreeMap<String, String>,
1370 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1371 _connector_conn_ref: Option<ConnectionId>,
1372 ) -> RpcResult<()> {
1373 unimplemented!()
1374 }
1375
1376 async fn alter_iceberg_table_props(
1377 &self,
1378 _table_id: TableId,
1379 _sink_id: SinkId,
1380 _source_id: SourceId,
1381 _changed_props: BTreeMap<String, String>,
1382 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1383 _connector_conn_ref: Option<ConnectionId>,
1384 ) -> RpcResult<()> {
1385 unimplemented!()
1386 }
1387
1388 async fn alter_source_connector_props(
1389 &self,
1390 _source_id: SourceId,
1391 _changed_props: BTreeMap<String, String>,
1392 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1393 _connector_conn_ref: Option<ConnectionId>,
1394 ) -> RpcResult<()> {
1395 unimplemented!()
1396 }
1397
1398 async fn alter_connection_connector_props(
1399 &self,
1400 _connection_id: u32,
1401 _changed_props: BTreeMap<String, String>,
1402 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1403 ) -> RpcResult<()> {
1404 Ok(())
1405 }
1406
1407 async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1408 unimplemented!()
1409 }
1410
1411 async fn list_iceberg_compaction_status(&self) -> RpcResult<Vec<IcebergCompactionStatus>> {
1412 Ok(vec![])
1413 }
1414
1415 async fn get_fragment_by_id(
1416 &self,
1417 _fragment_id: FragmentId,
1418 ) -> RpcResult<Option<FragmentDistribution>> {
1419 unimplemented!()
1420 }
1421
1422 async fn get_fragment_vnodes(
1423 &self,
1424 _fragment_id: FragmentId,
1425 ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1426 unimplemented!()
1427 }
1428
1429 async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1430 unimplemented!()
1431 }
1432
1433 fn worker_id(&self) -> WorkerId {
1434 0.into()
1435 }
1436
1437 async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1438 Ok(())
1439 }
1440
1441 async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1442 Ok(1)
1443 }
1444
1445 async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1446 Ok(())
1447 }
1448
1449 async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1450 Ok(RefreshResponse { status: None })
1451 }
1452
1453 fn cluster_id(&self) -> &str {
1454 "test-cluster-uuid"
1455 }
1456
1457 async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1458 unimplemented!()
1459 }
1460
1461 async fn get_hummock_table_change_log(
1462 &self,
1463 _start_epoch_inclusive: Option<u64>,
1464 _end_epoch_inclusive: Option<u64>,
1465 _table_ids: Option<HashSet<TableId>>,
1466 _exclude_empty: bool,
1467 _limit: Option<u32>,
1468 ) -> RpcResult<TableChangeLogs> {
1469 Ok(HashMap::default())
1470 }
1471
1472 async fn update_compaction_config(
1473 &self,
1474 _compaction_group_ids: Vec<risingwave_hummock_sdk::CompactionGroupId>,
1475 _configs: Vec<PbMutableConfig>,
1476 ) -> RpcResult<()> {
1477 Ok(())
1478 }
1479}
1480
1481#[cfg(test)]
1482pub static PROTO_FILE_DATA: &str = r#"
1483 syntax = "proto3";
1484 package test;
1485 message TestRecord {
1486 int32 id = 1;
1487 Country country = 3;
1488 int64 zipcode = 4;
1489 float rate = 5;
1490 }
1491 message TestRecordAlterType {
1492 string id = 1;
1493 Country country = 3;
1494 int32 zipcode = 4;
1495 float rate = 5;
1496 }
1497 message TestRecordExt {
1498 int32 id = 1;
1499 Country country = 3;
1500 int64 zipcode = 4;
1501 float rate = 5;
1502 string name = 6;
1503 }
1504 message Country {
1505 string address = 1;
1506 City city = 2;
1507 string zipcode = 3;
1508 }
1509 message City {
1510 string address = 1;
1511 string zipcode = 2;
1512 }"#;
1513
1514pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1517 let in_file = Builder::new()
1518 .prefix("temp")
1519 .suffix(".proto")
1520 .rand_bytes(8)
1521 .tempfile()
1522 .unwrap();
1523
1524 let out_file = Builder::new()
1525 .prefix("temp")
1526 .suffix(".pb")
1527 .rand_bytes(8)
1528 .tempfile()
1529 .unwrap();
1530
1531 let mut file = in_file.as_file();
1532 file.write_all(proto_data.as_ref())
1533 .expect("writing binary to test file");
1534 file.flush().expect("flush temp file failed");
1535 let include_path = in_file
1536 .path()
1537 .parent()
1538 .unwrap()
1539 .to_string_lossy()
1540 .into_owned();
1541 let out_path = out_file.path().to_string_lossy().into_owned();
1542 let in_path = in_file.path().to_string_lossy().into_owned();
1543 let mut compile = std::process::Command::new("protoc");
1544
1545 let out = compile
1546 .arg("--include_imports")
1547 .arg("-I")
1548 .arg(include_path)
1549 .arg(format!("--descriptor_set_out={}", out_path))
1550 .arg(in_path)
1551 .output()
1552 .expect("failed to compile proto");
1553 if !out.status.success() {
1554 panic!("compile proto failed \n output: {:?}", out);
1555 }
1556 out_file
1557}