1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28 AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29 DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30 FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31 RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::util::cluster_limit::ClusterLimit;
38use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
39use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
40use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
41use risingwave_pb::backup_service::MetaSnapshotMetadata;
42use risingwave_pb::catalog::{
43 PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44 PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::{PbObjectType, WorkerNode};
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
49use risingwave_pb::ddl_service::{
50 DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
51 alter_swap_rename_request, create_connection_request, streaming_job_resource_type,
52};
53use risingwave_pb::hummock::write_limits::WriteLimit;
54use risingwave_pb::hummock::{
55 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
56};
57use risingwave_pb::id::ActorId;
58use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
59use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
60use risingwave_pb::meta::list_actor_states_response::ActorState;
61use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
62use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
63use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
64use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
65use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
66use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
67use risingwave_pb::meta::{
68 EventLog, FragmentDistribution, ObjectDependency as PbObjectDependency, PbTableParallelism,
69 PbThrottleTarget, RecoveryStatus, RefreshRequest, RefreshResponse, SystemParams,
70 list_sink_log_store_tables_response,
71};
72use risingwave_pb::secret::PbSecretRef;
73use risingwave_pb::stream_plan::StreamFragmentGraph;
74use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
75use risingwave_pb::user::update_user_request::UpdateField;
76use risingwave_pb::user::{GrantPrivilege, UserInfo};
77use risingwave_rpc_client::error::Result as RpcResult;
78use tempfile::{Builder, NamedTempFile};
79
80use crate::FrontendOpts;
81use crate::catalog::catalog_service::CatalogWriter;
82use crate::catalog::root_catalog::Catalog;
83use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
84use crate::error::{ErrorCode, Result, RwError};
85use crate::handler::RwPgResponse;
86use crate::meta_client::FrontendMetaClient;
87use crate::scheduler::HummockSnapshotManagerRef;
88use crate::session::{AuthContext, FrontendEnv, SessionImpl};
89use crate::user::UserId;
90use crate::user::user_manager::UserInfoManager;
91use crate::user::user_service::UserInfoWriter;
92
93pub struct LocalFrontend {
95 pub opts: FrontendOpts,
96 env: FrontendEnv,
97}
98
99impl SessionManager for LocalFrontend {
100 type Error = RwError;
101 type Session = SessionImpl;
102
103 fn create_dummy_session(
104 &self,
105 _database_id: DatabaseId,
106 ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
107 unreachable!()
108 }
109
110 fn connect(
111 &self,
112 _database: &str,
113 _user_name: &str,
114 _peer_addr: AddressRef,
115 ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
116 Ok(self.session_ref())
117 }
118
119 fn cancel_queries_in_session(&self, _session_id: SessionId) {
120 unreachable!()
121 }
122
123 fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
124 unreachable!()
125 }
126
127 fn end_session(&self, _session: &Self::Session) {
128 unreachable!()
129 }
130}
131
132impl LocalFrontend {
133 #[expect(clippy::unused_async)]
134 pub async fn new(opts: FrontendOpts) -> Self {
135 let env = FrontendEnv::mock();
136 Self { opts, env }
137 }
138
139 pub async fn run_sql(
140 &self,
141 sql: impl Into<String>,
142 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
143 let sql: Arc<str> = Arc::from(sql.into());
144 Box::pin(self.session_ref().run_statement(sql, vec![])).await
145 }
146
147 pub async fn run_sql_with_session(
148 &self,
149 session_ref: Arc<SessionImpl>,
150 sql: impl Into<String>,
151 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
152 let sql: Arc<str> = Arc::from(sql.into());
153 Box::pin(session_ref.run_statement(sql, vec![])).await
154 }
155
156 pub async fn run_user_sql(
157 &self,
158 sql: impl Into<String>,
159 database: String,
160 user_name: String,
161 user_id: UserId,
162 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
163 let sql: Arc<str> = Arc::from(sql.into());
164 Box::pin(
165 self.session_user_ref(database, user_name, user_id)
166 .run_statement(sql, vec![]),
167 )
168 .await
169 }
170
171 pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
172 let mut rsp = self.run_sql(sql).await.unwrap();
173 let mut res = vec![];
174 #[for_await]
175 for row_set in rsp.values_stream() {
176 for row in row_set.unwrap() {
177 res.push(format!("{:?}", row));
178 }
179 }
180 res
181 }
182
183 pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
184 let mut rsp = self.run_sql(sql).await.unwrap();
185 assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
186 let mut res = String::new();
187 #[for_await]
188 for row_set in rsp.values_stream() {
189 for row in row_set.unwrap() {
190 let row: Row = row;
191 let row = row.values()[0].as_ref().unwrap();
192 res += std::str::from_utf8(row).unwrap();
193 res += "\n";
194 }
195 }
196 res
197 }
198
199 pub fn session_ref(&self) -> Arc<SessionImpl> {
201 self.session_user_ref(
202 DEFAULT_DATABASE_NAME.to_owned(),
203 DEFAULT_SUPER_USER.to_owned(),
204 DEFAULT_SUPER_USER_ID,
205 )
206 }
207
208 pub fn session_user_ref(
209 &self,
210 database: String,
211 user_name: String,
212 user_id: UserId,
213 ) -> Arc<SessionImpl> {
214 Arc::new(SessionImpl::new(
215 self.env.clone(),
216 AuthContext::new(database, user_name, user_id),
217 UserAuthenticator::None,
218 (0, 0),
220 Address::Tcp(SocketAddr::new(
221 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
222 6666,
223 ))
224 .into(),
225 Default::default(),
226 ))
227 }
228}
229
230pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
231 if rsp.stmt_type() != StatementType::EXPLAIN {
232 panic!("RESPONSE INVALID: {rsp:?}");
233 }
234 let mut res = String::new();
235 #[for_await]
236 for row_set in rsp.values_stream() {
237 for row in row_set.unwrap() {
238 let row: Row = row;
239 let row = row.values()[0].as_ref().unwrap();
240 res += std::str::from_utf8(row).unwrap();
241 res += "\n";
242 }
243 }
244 res
245}
246
247pub struct MockCatalogWriter {
248 catalog: Arc<RwLock<Catalog>>,
249 id: AtomicU32,
250 table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
251 schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
252 hummock_snapshot_manager: HummockSnapshotManagerRef,
253}
254
255#[async_trait::async_trait]
256impl CatalogWriter for MockCatalogWriter {
257 async fn create_database(
258 &self,
259 db_name: &str,
260 owner: UserId,
261 resource_group: &str,
262 barrier_interval_ms: Option<u32>,
263 checkpoint_frequency: Option<u64>,
264 ) -> Result<()> {
265 let database_id = DatabaseId::new(self.gen_id());
266 self.catalog.write().create_database(&PbDatabase {
267 name: db_name.to_owned(),
268 id: database_id,
269 owner,
270 resource_group: resource_group.to_owned(),
271 barrier_interval_ms,
272 checkpoint_frequency,
273 });
274 self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
275 .await?;
276 self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
277 .await?;
278 self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
279 .await?;
280 Ok(())
281 }
282
283 async fn create_schema(
284 &self,
285 db_id: DatabaseId,
286 schema_name: &str,
287 owner: UserId,
288 ) -> Result<()> {
289 let id = self.gen_id();
290 self.catalog.write().create_schema(&PbSchema {
291 id,
292 name: schema_name.to_owned(),
293 database_id: db_id,
294 owner,
295 });
296 self.add_schema_id(id, db_id);
297 Ok(())
298 }
299
300 async fn create_materialized_view(
301 &self,
302 mut table: PbTable,
303 _graph: StreamFragmentGraph,
304 dependencies: HashSet<ObjectId>,
305 _resource_type: streaming_job_resource_type::ResourceType,
306 _if_not_exists: bool,
307 ) -> Result<()> {
308 table.id = self.gen_id();
309 table.stream_job_status = PbStreamJobStatus::Created as _;
310 table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
311 self.catalog.write().create_table(&table);
312 self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
313 self.insert_object_dependencies(table.id.as_object_id(), dependencies);
314 self.hummock_snapshot_manager.add_table_for_test(table.id);
315 Ok(())
316 }
317
318 async fn replace_materialized_view(
319 &self,
320 mut table: PbTable,
321 _graph: StreamFragmentGraph,
322 ) -> Result<()> {
323 table.stream_job_status = PbStreamJobStatus::Created as _;
324 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
325 self.catalog.write().update_table(&table);
326 Ok(())
327 }
328
329 async fn create_view(&self, mut view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
330 view.id = self.gen_id();
331 self.catalog.write().create_view(&view);
332 self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
333 self.insert_object_dependencies(view.id.as_object_id(), dependencies);
334 Ok(())
335 }
336
337 async fn create_table(
338 &self,
339 source: Option<PbSource>,
340 mut table: PbTable,
341 graph: StreamFragmentGraph,
342 _job_type: PbTableJobType,
343 if_not_exists: bool,
344 dependencies: HashSet<ObjectId>,
345 ) -> Result<()> {
346 if let Some(source) = source {
347 let source_id = self.create_source_inner(source)?;
348 table.optional_associated_source_id = Some(source_id.into());
349 }
350 self.create_materialized_view(
351 table,
352 graph,
353 dependencies,
354 streaming_job_resource_type::ResourceType::Regular(true),
355 if_not_exists,
356 )
357 .await?;
358 Ok(())
359 }
360
361 async fn replace_table(
362 &self,
363 _source: Option<PbSource>,
364 mut table: PbTable,
365 _graph: StreamFragmentGraph,
366 _job_type: TableJobType,
367 ) -> Result<()> {
368 table.stream_job_status = PbStreamJobStatus::Created as _;
369 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
370 self.catalog.write().update_table(&table);
371 Ok(())
372 }
373
374 async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
375 self.catalog.write().update_source(&source);
376 Ok(())
377 }
378
379 async fn create_source(
380 &self,
381 source: PbSource,
382 _graph: Option<StreamFragmentGraph>,
383 _if_not_exists: bool,
384 ) -> Result<()> {
385 self.create_source_inner(source).map(|_| ())
386 }
387
388 async fn create_sink(
389 &self,
390 sink: PbSink,
391 graph: StreamFragmentGraph,
392 dependencies: HashSet<ObjectId>,
393 _if_not_exists: bool,
394 ) -> Result<()> {
395 let sink_id = self.create_sink_inner(sink, graph)?;
396 self.insert_object_dependencies(sink_id.as_object_id(), dependencies);
397 Ok(())
398 }
399
400 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
401 self.create_subscription_inner(subscription)
402 }
403
404 async fn create_index(
405 &self,
406 mut index: PbIndex,
407 mut index_table: PbTable,
408 _graph: StreamFragmentGraph,
409 _if_not_exists: bool,
410 ) -> Result<()> {
411 index_table.id = self.gen_id();
412 index_table.stream_job_status = PbStreamJobStatus::Created as _;
413 index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
414 self.catalog.write().create_table(&index_table);
415 self.add_table_or_index_id(
416 index_table.id.as_raw_id(),
417 index_table.schema_id,
418 index_table.database_id,
419 );
420
421 index.id = index_table.id.as_raw_id().into();
422 index.index_table_id = index_table.id;
423 self.catalog.write().create_index(&index);
424 Ok(())
425 }
426
427 async fn create_function(&self, _function: PbFunction) -> Result<()> {
428 unreachable!()
429 }
430
431 async fn create_connection(
432 &self,
433 _connection_name: String,
434 _database_id: DatabaseId,
435 _schema_id: SchemaId,
436 _owner_id: UserId,
437 _connection: create_connection_request::Payload,
438 ) -> Result<()> {
439 unreachable!()
440 }
441
442 async fn create_secret(
443 &self,
444 _secret_name: String,
445 _database_id: DatabaseId,
446 _schema_id: SchemaId,
447 _owner_id: UserId,
448 _payload: Vec<u8>,
449 ) -> Result<()> {
450 unreachable!()
451 }
452
453 async fn comment_on(&self, _comment: PbComment) -> Result<()> {
454 unreachable!()
455 }
456
457 async fn drop_table(
458 &self,
459 source_id: Option<SourceId>,
460 table_id: TableId,
461 cascade: bool,
462 ) -> Result<()> {
463 if cascade {
464 return Err(ErrorCode::NotSupported(
465 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
466 "use drop instead".to_owned(),
467 )
468 .into());
469 }
470 if let Some(source_id) = source_id {
471 self.drop_table_or_source_id(source_id.as_raw_id());
472 }
473 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
474 let indexes =
475 self.catalog
476 .read()
477 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
478 for index in indexes {
479 self.drop_index(index.id, cascade).await?;
480 }
481 self.catalog
482 .write()
483 .drop_table(database_id, schema_id, table_id);
484 if let Some(source_id) = source_id {
485 self.catalog
486 .write()
487 .drop_source(database_id, schema_id, source_id);
488 }
489 Ok(())
490 }
491
492 async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
493 unreachable!()
494 }
495
496 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
497 if cascade {
498 return Err(ErrorCode::NotSupported(
499 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
500 "use drop instead".to_owned(),
501 )
502 .into());
503 }
504 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
505 let indexes =
506 self.catalog
507 .read()
508 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
509 for index in indexes {
510 self.drop_index(index.id, cascade).await?;
511 }
512 self.catalog
513 .write()
514 .drop_table(database_id, schema_id, table_id);
515 Ok(())
516 }
517
518 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
519 if cascade {
520 return Err(ErrorCode::NotSupported(
521 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
522 "use drop instead".to_owned(),
523 )
524 .into());
525 }
526 let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
527 self.catalog
528 .write()
529 .drop_source(database_id, schema_id, source_id);
530 Ok(())
531 }
532
533 async fn reset_source(&self, _source_id: SourceId) -> Result<()> {
534 Ok(())
535 }
536
537 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
538 if cascade {
539 return Err(ErrorCode::NotSupported(
540 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
541 "use drop instead".to_owned(),
542 )
543 .into());
544 }
545 let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
546 self.catalog
547 .write()
548 .drop_sink(database_id, schema_id, sink_id);
549 Ok(())
550 }
551
552 async fn drop_subscription(
553 &self,
554 subscription_id: SubscriptionId,
555 cascade: bool,
556 ) -> Result<()> {
557 if cascade {
558 return Err(ErrorCode::NotSupported(
559 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
560 "use drop instead".to_owned(),
561 )
562 .into());
563 }
564 let (database_id, schema_id) =
565 self.drop_table_or_subscription_id(subscription_id.as_raw_id());
566 self.catalog
567 .write()
568 .drop_subscription(database_id, schema_id, subscription_id);
569 Ok(())
570 }
571
572 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
573 if cascade {
574 return Err(ErrorCode::NotSupported(
575 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
576 "use drop instead".to_owned(),
577 )
578 .into());
579 }
580 let &schema_id = self
581 .table_id_to_schema_id
582 .read()
583 .get(&index_id.as_raw_id())
584 .unwrap();
585 let database_id = self.get_database_id_by_schema(schema_id);
586
587 let index = {
588 let catalog_reader = self.catalog.read();
589 let schema_catalog = catalog_reader
590 .get_schema_by_id(database_id, schema_id)
591 .unwrap();
592 schema_catalog.get_index_by_id(index_id).unwrap().clone()
593 };
594
595 let index_table_id = index.index_table().id;
596 let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
597 self.catalog
598 .write()
599 .drop_index(database_id, schema_id, index_id);
600 self.catalog
601 .write()
602 .drop_table(database_id, schema_id, index_table_id);
603 Ok(())
604 }
605
606 async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
607 unreachable!()
608 }
609
610 async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
611 unreachable!()
612 }
613
614 async fn drop_secret(&self, _secret_id: SecretId, _cascade: bool) -> Result<()> {
615 unreachable!()
616 }
617
618 async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
619 self.catalog.write().drop_database(database_id);
620 Ok(())
621 }
622
623 async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
624 let database_id = self.drop_schema_id(schema_id);
625 self.catalog.write().drop_schema(database_id, schema_id);
626 Ok(())
627 }
628
629 async fn alter_name(
630 &self,
631 object_id: alter_name_request::Object,
632 object_name: &str,
633 ) -> Result<()> {
634 match object_id {
635 alter_name_request::Object::TableId(table_id) => {
636 self.catalog
637 .write()
638 .alter_table_name_by_id(table_id, object_name);
639 Ok(())
640 }
641 _ => {
642 unimplemented!()
643 }
644 }
645 }
646
647 async fn alter_source(&self, source: PbSource) -> Result<()> {
648 self.catalog.write().update_source(&source);
649 Ok(())
650 }
651
652 async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<()> {
653 for database in self.catalog.read().iter_databases() {
654 for schema in database.iter_schemas() {
655 match object {
656 Object::TableId(table_id) => {
657 if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
658 {
659 let mut pb_table = table.to_prost();
660 pb_table.owner = owner_id;
661 self.catalog.write().update_table(&pb_table);
662 return Ok(());
663 }
664 }
665 _ => unreachable!(),
666 }
667 }
668 }
669
670 Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
671 }
672
673 async fn alter_subscription_retention(
674 &self,
675 subscription_id: SubscriptionId,
676 retention_seconds: u64,
677 definition: String,
678 ) -> Result<()> {
679 for database in self.catalog.read().iter_databases() {
680 for schema in database.iter_schemas() {
681 if let Some(subscription) = schema.get_subscription_by_id(subscription_id) {
682 let mut pb_subscription = subscription.to_proto();
683 pb_subscription.retention_seconds = retention_seconds;
684 pb_subscription.definition = definition;
685 self.catalog.write().update_subscription(&pb_subscription);
686 return Ok(());
687 }
688 }
689 }
690
691 Err(
692 ErrorCode::ItemNotFound(format!("subscription not found: {:?}", subscription_id))
693 .into(),
694 )
695 }
696
697 async fn alter_set_schema(
698 &self,
699 object: alter_set_schema_request::Object,
700 new_schema_id: SchemaId,
701 ) -> Result<()> {
702 match object {
703 alter_set_schema_request::Object::TableId(table_id) => {
704 let mut pb_table = {
705 let reader = self.catalog.read();
706 let table = reader.get_any_table_by_id(table_id)?.to_owned();
707 table.to_prost()
708 };
709 pb_table.schema_id = new_schema_id;
710 self.catalog.write().update_table(&pb_table);
711 self.table_id_to_schema_id
712 .write()
713 .insert(table_id.as_raw_id(), new_schema_id);
714 Ok(())
715 }
716 _ => unreachable!(),
717 }
718 }
719
720 async fn alter_parallelism(
721 &self,
722 _job_id: JobId,
723 _parallelism: PbTableParallelism,
724 _deferred: bool,
725 ) -> Result<()> {
726 todo!()
727 }
728
729 async fn alter_backfill_parallelism(
730 &self,
731 _job_id: JobId,
732 _parallelism: Option<PbTableParallelism>,
733 _deferred: bool,
734 ) -> Result<()> {
735 todo!()
736 }
737
738 async fn alter_config(
739 &self,
740 _job_id: JobId,
741 _entries_to_add: HashMap<String, String>,
742 _keys_to_remove: Vec<String>,
743 ) -> Result<()> {
744 todo!()
745 }
746
747 async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
748 todo!()
749 }
750
751 async fn alter_secret(
752 &self,
753 _secret_id: SecretId,
754 _secret_name: String,
755 _database_id: DatabaseId,
756 _schema_id: SchemaId,
757 _owner_id: UserId,
758 _payload: Vec<u8>,
759 ) -> Result<()> {
760 unreachable!()
761 }
762
763 async fn alter_resource_group(
764 &self,
765 _table_id: TableId,
766 _resource_group: Option<String>,
767 _deferred: bool,
768 ) -> Result<()> {
769 todo!()
770 }
771
772 async fn alter_database_param(
773 &self,
774 database_id: DatabaseId,
775 param: AlterDatabaseParam,
776 ) -> Result<()> {
777 let mut pb_database = {
778 let reader = self.catalog.read();
779 let database = reader.get_database_by_id(database_id)?.to_owned();
780 database.to_prost()
781 };
782 match param {
783 AlterDatabaseParam::BarrierIntervalMs(interval) => {
784 pb_database.barrier_interval_ms = interval;
785 }
786 AlterDatabaseParam::CheckpointFrequency(frequency) => {
787 pb_database.checkpoint_frequency = frequency;
788 }
789 }
790 self.catalog.write().update_database(&pb_database);
791 Ok(())
792 }
793
794 async fn create_iceberg_table(
795 &self,
796 _table_job_info: PbTableJobInfo,
797 _sink_job_info: PbSinkJobInfo,
798 _iceberg_source: PbSource,
799 _if_not_exists: bool,
800 ) -> Result<()> {
801 todo!()
802 }
803
804 async fn wait(&self) -> Result<()> {
805 Ok(())
806 }
807}
808
809impl MockCatalogWriter {
810 pub fn new(
811 catalog: Arc<RwLock<Catalog>>,
812 hummock_snapshot_manager: HummockSnapshotManagerRef,
813 ) -> Self {
814 catalog.write().create_database(&PbDatabase {
815 id: 0.into(),
816 name: DEFAULT_DATABASE_NAME.to_owned(),
817 owner: DEFAULT_SUPER_USER_ID,
818 resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
819 barrier_interval_ms: None,
820 checkpoint_frequency: None,
821 });
822 catalog.write().create_schema(&PbSchema {
823 id: 1.into(),
824 name: DEFAULT_SCHEMA_NAME.to_owned(),
825 database_id: 0.into(),
826 owner: DEFAULT_SUPER_USER_ID,
827 });
828 catalog.write().create_schema(&PbSchema {
829 id: 2.into(),
830 name: PG_CATALOG_SCHEMA_NAME.to_owned(),
831 database_id: 0.into(),
832 owner: DEFAULT_SUPER_USER_ID,
833 });
834 catalog.write().create_schema(&PbSchema {
835 id: 3.into(),
836 name: RW_CATALOG_SCHEMA_NAME.to_owned(),
837 database_id: 0.into(),
838 owner: DEFAULT_SUPER_USER_ID,
839 });
840 let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
841 map.insert(1_u32.into(), 0_u32.into());
842 map.insert(2_u32.into(), 0_u32.into());
843 map.insert(3_u32.into(), 0_u32.into());
844 Self {
845 catalog,
846 id: AtomicU32::new(3),
847 table_id_to_schema_id: Default::default(),
848 schema_id_to_database_id: RwLock::new(map),
849 hummock_snapshot_manager,
850 }
851 }
852
853 fn gen_id<T: From<u32>>(&self) -> T {
854 (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
856 }
857
858 fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
859 self.table_id_to_schema_id
860 .write()
861 .insert(table_id, schema_id);
862 }
863
864 fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
865 let schema_id = self
866 .table_id_to_schema_id
867 .write()
868 .remove(&table_id)
869 .unwrap();
870 (self.get_database_id_by_schema(schema_id), schema_id)
871 }
872
873 fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
874 self.table_id_to_schema_id
875 .write()
876 .insert(table_id, schema_id);
877 }
878
879 fn add_table_or_subscription_id(
880 &self,
881 table_id: u32,
882 schema_id: SchemaId,
883 _database_id: DatabaseId,
884 ) {
885 self.table_id_to_schema_id
886 .write()
887 .insert(table_id, schema_id);
888 }
889
890 fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
891 self.table_id_to_schema_id
892 .write()
893 .insert(table_id, schema_id);
894 }
895
896 fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
897 let schema_id = self
898 .table_id_to_schema_id
899 .write()
900 .remove(&table_id)
901 .unwrap();
902 (self.get_database_id_by_schema(schema_id), schema_id)
903 }
904
905 fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
906 let schema_id = self
907 .table_id_to_schema_id
908 .write()
909 .remove(&table_id)
910 .unwrap();
911 (self.get_database_id_by_schema(schema_id), schema_id)
912 }
913
914 fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
915 let schema_id = self
916 .table_id_to_schema_id
917 .write()
918 .remove(&table_id)
919 .unwrap();
920 (self.get_database_id_by_schema(schema_id), schema_id)
921 }
922
923 fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
924 self.schema_id_to_database_id
925 .write()
926 .insert(schema_id, database_id);
927 }
928
929 fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
930 self.schema_id_to_database_id
931 .write()
932 .remove(&schema_id)
933 .unwrap()
934 }
935
936 fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
937 source.id = self.gen_id();
938 self.catalog.write().create_source(&source);
939 self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
940 Ok(source.id)
941 }
942
943 fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<SinkId> {
944 sink.id = self.gen_id();
945 sink.stream_job_status = PbStreamJobStatus::Created as _;
946 self.catalog.write().create_sink(&sink);
947 self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
948 Ok(sink.id)
949 }
950
951 fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
952 subscription.id = self.gen_id();
953 self.catalog.write().create_subscription(&subscription);
954 self.add_table_or_subscription_id(
955 subscription.id.as_raw_id(),
956 subscription.schema_id,
957 subscription.database_id,
958 );
959 Ok(())
960 }
961
962 fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
963 *self
964 .schema_id_to_database_id
965 .read()
966 .get(&schema_id)
967 .unwrap()
968 }
969
970 fn get_object_type(&self, object_id: ObjectId) -> PbObjectType {
971 let catalog = self.catalog.read();
972 for database in catalog.iter_databases() {
973 for schema in database.iter_schemas() {
974 if let Some(table) = schema.get_created_table_by_id(object_id.as_table_id()) {
975 return if table.is_mview() {
976 PbObjectType::Mview
977 } else {
978 PbObjectType::Table
979 };
980 }
981 if schema.get_source_by_id(object_id.as_source_id()).is_some() {
982 return PbObjectType::Source;
983 }
984 if schema.get_view_by_id(object_id.as_view_id()).is_some() {
985 return PbObjectType::View;
986 }
987 if schema.get_index_by_id(object_id.as_index_id()).is_some() {
988 return PbObjectType::Index;
989 }
990 }
991 }
992 PbObjectType::Unspecified
993 }
994
995 fn insert_object_dependencies(&self, object_id: ObjectId, dependencies: HashSet<ObjectId>) {
996 if dependencies.is_empty() {
997 return;
998 }
999 let dependencies = dependencies
1000 .into_iter()
1001 .map(|referenced_object_id| PbObjectDependency {
1002 object_id,
1003 referenced_object_id,
1004 referenced_object_type: self.get_object_type(referenced_object_id) as i32,
1005 })
1006 .collect();
1007 self.catalog
1008 .write()
1009 .insert_object_dependencies(dependencies);
1010 }
1011}
1012
1013pub struct MockUserInfoWriter {
1014 id: AtomicU32,
1015 user_info: Arc<RwLock<UserInfoManager>>,
1016}
1017
1018#[async_trait::async_trait]
1019impl UserInfoWriter for MockUserInfoWriter {
1020 async fn create_user(&self, user: UserInfo) -> Result<()> {
1021 let mut user = user;
1022 user.id = self.gen_id().into();
1023 self.user_info.write().create_user(user);
1024 Ok(())
1025 }
1026
1027 async fn drop_user(&self, id: UserId) -> Result<()> {
1028 self.user_info.write().drop_user(id);
1029 Ok(())
1030 }
1031
1032 async fn update_user(
1033 &self,
1034 update_user: UserInfo,
1035 update_fields: Vec<UpdateField>,
1036 ) -> Result<()> {
1037 let mut lock = self.user_info.write();
1038 let id = update_user.get_id();
1039 let Some(old_name) = lock.get_user_name_by_id(id) else {
1040 return Ok(());
1041 };
1042 let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
1043 update_fields.into_iter().for_each(|field| match field {
1044 UpdateField::Super => user_info.is_super = update_user.is_super,
1045 UpdateField::Login => user_info.can_login = update_user.can_login,
1046 UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
1047 UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
1048 UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
1049 UpdateField::Rename => user_info.name.clone_from(&update_user.name),
1050 UpdateField::Admin => user_info.is_admin = update_user.is_admin,
1051 UpdateField::Unspecified => unreachable!(),
1052 });
1053 lock.update_user(update_user);
1054 Ok(())
1055 }
1056
1057 async fn grant_privilege(
1060 &self,
1061 users: Vec<UserId>,
1062 privileges: Vec<GrantPrivilege>,
1063 with_grant_option: bool,
1064 _grantor: UserId,
1065 ) -> Result<()> {
1066 let privileges = privileges
1067 .into_iter()
1068 .map(|mut p| {
1069 p.action_with_opts
1070 .iter_mut()
1071 .for_each(|ao| ao.with_grant_option = with_grant_option);
1072 p
1073 })
1074 .collect::<Vec<_>>();
1075 for user_id in users {
1076 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1077 u.extend_privileges(privileges.clone());
1078 }
1079 }
1080 Ok(())
1081 }
1082
1083 async fn revoke_privilege(
1086 &self,
1087 users: Vec<UserId>,
1088 privileges: Vec<GrantPrivilege>,
1089 _granted_by: UserId,
1090 _revoke_by: UserId,
1091 revoke_grant_option: bool,
1092 _cascade: bool,
1093 ) -> Result<()> {
1094 for user_id in users {
1095 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1096 u.revoke_privileges(privileges.clone(), revoke_grant_option);
1097 }
1098 }
1099 Ok(())
1100 }
1101
1102 async fn alter_default_privilege(
1103 &self,
1104 _users: Vec<UserId>,
1105 _database_id: DatabaseId,
1106 _schemas: Vec<SchemaId>,
1107 _operation: AlterDefaultPrivilegeOperation,
1108 _operated_by: UserId,
1109 ) -> Result<()> {
1110 todo!()
1111 }
1112}
1113
1114impl MockUserInfoWriter {
1115 pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1116 user_info.write().create_user(UserInfo {
1117 id: DEFAULT_SUPER_USER_ID,
1118 name: DEFAULT_SUPER_USER.to_owned(),
1119 is_super: true,
1120 can_create_db: true,
1121 can_create_user: true,
1122 can_login: true,
1123 ..Default::default()
1124 });
1125 user_info.write().create_user(UserInfo {
1126 id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1127 name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1128 is_super: true,
1129 can_create_db: true,
1130 can_create_user: true,
1131 can_login: true,
1132 is_admin: true,
1133 ..Default::default()
1134 });
1135 Self {
1136 user_info,
1137 id: AtomicU32::new(NON_RESERVED_USER_ID.as_raw_id()),
1138 }
1139 }
1140
1141 fn gen_id(&self) -> u32 {
1142 self.id.fetch_add(1, Ordering::SeqCst)
1143 }
1144}
1145
1146pub struct MockFrontendMetaClient {}
1147
1148#[async_trait::async_trait]
1149impl FrontendMetaClient for MockFrontendMetaClient {
1150 async fn try_unregister(&self) {}
1151
1152 async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1153 Ok(INVALID_VERSION_ID)
1154 }
1155
1156 async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1157 Ok(vec![])
1158 }
1159
1160 async fn list_table_fragments(
1161 &self,
1162 _table_ids: &[JobId],
1163 ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1164 Ok(HashMap::default())
1165 }
1166
1167 async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1168 Ok(vec![])
1169 }
1170
1171 async fn list_fragment_distribution(
1172 &self,
1173 _include_node: bool,
1174 ) -> RpcResult<Vec<FragmentDistribution>> {
1175 Ok(vec![])
1176 }
1177
1178 async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1179 Ok(vec![])
1180 }
1181
1182 async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1183 Ok(vec![])
1184 }
1185
1186 async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1187 Ok(vec![])
1188 }
1189
1190 async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1191 Ok(vec![])
1192 }
1193
1194 async fn list_sink_log_store_tables(
1195 &self,
1196 ) -> RpcResult<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1197 Ok(vec![])
1198 }
1199
1200 async fn set_system_param(
1201 &self,
1202 _param: String,
1203 _value: Option<String>,
1204 ) -> RpcResult<Option<SystemParamsReader>> {
1205 Ok(Some(SystemParams::default().into()))
1206 }
1207
1208 async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1209 Ok(Default::default())
1210 }
1211
1212 async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1213 Ok("".to_owned())
1214 }
1215
1216 async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1217 Ok(vec![])
1218 }
1219
1220 async fn get_tables(
1221 &self,
1222 _table_ids: Vec<crate::catalog::TableId>,
1223 _include_dropped_tables: bool,
1224 ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1225 Ok(HashMap::new())
1226 }
1227
1228 async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, HummockVersionId)>> {
1229 unimplemented!()
1230 }
1231
1232 async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1233 unimplemented!()
1234 }
1235
1236 async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1237 Ok(HummockVersion::default())
1238 }
1239
1240 async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1241 unimplemented!()
1242 }
1243
1244 async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1245 unimplemented!()
1246 }
1247
1248 async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1249 unimplemented!()
1250 }
1251
1252 async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1253 unimplemented!()
1254 }
1255
1256 async fn list_hummock_active_write_limits(
1257 &self,
1258 ) -> RpcResult<HashMap<CompactionGroupId, WriteLimit>> {
1259 unimplemented!()
1260 }
1261
1262 async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1263 unimplemented!()
1264 }
1265
1266 async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1267 Ok(vec![])
1268 }
1269
1270 async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1271 unimplemented!()
1272 }
1273
1274 async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1275 Ok(vec![])
1276 }
1277
1278 async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1279 unimplemented!()
1280 }
1281
1282 async fn recover(&self) -> RpcResult<()> {
1283 unimplemented!()
1284 }
1285
1286 async fn apply_throttle(
1287 &self,
1288 _throttle_target: PbThrottleTarget,
1289 _throttle_type: risingwave_pb::common::PbThrottleType,
1290 _id: u32,
1291 _rate_limit: Option<u32>,
1292 ) -> RpcResult<()> {
1293 unimplemented!()
1294 }
1295
1296 async fn alter_fragment_parallelism(
1297 &self,
1298 _fragment_ids: Vec<FragmentId>,
1299 _parallelism: Option<PbTableParallelism>,
1300 ) -> RpcResult<()> {
1301 unimplemented!()
1302 }
1303
1304 async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1305 Ok(RecoveryStatus::StatusRunning)
1306 }
1307
1308 async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1309 Ok(vec![])
1310 }
1311
1312 async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1313 Ok(vec![])
1314 }
1315
1316 async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1317 Ok(HashMap::default())
1318 }
1319
1320 async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1321 unimplemented!()
1322 }
1323
1324 async fn alter_sink_props(
1325 &self,
1326 _sink_id: SinkId,
1327 _changed_props: BTreeMap<String, String>,
1328 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1329 _connector_conn_ref: Option<ConnectionId>,
1330 ) -> RpcResult<()> {
1331 unimplemented!()
1332 }
1333
1334 async fn alter_iceberg_table_props(
1335 &self,
1336 _table_id: TableId,
1337 _sink_id: SinkId,
1338 _source_id: SourceId,
1339 _changed_props: BTreeMap<String, String>,
1340 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1341 _connector_conn_ref: Option<ConnectionId>,
1342 ) -> RpcResult<()> {
1343 unimplemented!()
1344 }
1345
1346 async fn alter_source_connector_props(
1347 &self,
1348 _source_id: SourceId,
1349 _changed_props: BTreeMap<String, String>,
1350 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1351 _connector_conn_ref: Option<ConnectionId>,
1352 ) -> RpcResult<()> {
1353 unimplemented!()
1354 }
1355
1356 async fn alter_connection_connector_props(
1357 &self,
1358 _connection_id: u32,
1359 _changed_props: BTreeMap<String, String>,
1360 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1361 ) -> RpcResult<()> {
1362 Ok(())
1363 }
1364
1365 async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1366 unimplemented!()
1367 }
1368
1369 async fn get_fragment_by_id(
1370 &self,
1371 _fragment_id: FragmentId,
1372 ) -> RpcResult<Option<FragmentDistribution>> {
1373 unimplemented!()
1374 }
1375
1376 async fn get_fragment_vnodes(
1377 &self,
1378 _fragment_id: FragmentId,
1379 ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1380 unimplemented!()
1381 }
1382
1383 async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1384 unimplemented!()
1385 }
1386
1387 fn worker_id(&self) -> WorkerId {
1388 0.into()
1389 }
1390
1391 async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1392 Ok(())
1393 }
1394
1395 async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1396 Ok(1)
1397 }
1398
1399 async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1400 Ok(())
1401 }
1402
1403 async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1404 Ok(RefreshResponse { status: None })
1405 }
1406
1407 fn cluster_id(&self) -> &str {
1408 "test-cluster-uuid"
1409 }
1410
1411 async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1412 unimplemented!()
1413 }
1414}
1415
1416#[cfg(test)]
1417pub static PROTO_FILE_DATA: &str = r#"
1418 syntax = "proto3";
1419 package test;
1420 message TestRecord {
1421 int32 id = 1;
1422 Country country = 3;
1423 int64 zipcode = 4;
1424 float rate = 5;
1425 }
1426 message TestRecordAlterType {
1427 string id = 1;
1428 Country country = 3;
1429 int32 zipcode = 4;
1430 float rate = 5;
1431 }
1432 message TestRecordExt {
1433 int32 id = 1;
1434 Country country = 3;
1435 int64 zipcode = 4;
1436 float rate = 5;
1437 string name = 6;
1438 }
1439 message Country {
1440 string address = 1;
1441 City city = 2;
1442 string zipcode = 3;
1443 }
1444 message City {
1445 string address = 1;
1446 string zipcode = 2;
1447 }"#;
1448
1449pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1452 let in_file = Builder::new()
1453 .prefix("temp")
1454 .suffix(".proto")
1455 .rand_bytes(8)
1456 .tempfile()
1457 .unwrap();
1458
1459 let out_file = Builder::new()
1460 .prefix("temp")
1461 .suffix(".pb")
1462 .rand_bytes(8)
1463 .tempfile()
1464 .unwrap();
1465
1466 let mut file = in_file.as_file();
1467 file.write_all(proto_data.as_ref())
1468 .expect("writing binary to test file");
1469 file.flush().expect("flush temp file failed");
1470 let include_path = in_file
1471 .path()
1472 .parent()
1473 .unwrap()
1474 .to_string_lossy()
1475 .into_owned();
1476 let out_path = out_file.path().to_string_lossy().into_owned();
1477 let in_path = in_file.path().to_string_lossy().into_owned();
1478 let mut compile = std::process::Command::new("protoc");
1479
1480 let out = compile
1481 .arg("--include_imports")
1482 .arg("-I")
1483 .arg(include_path)
1484 .arg(format!("--descriptor_set_out={}", out_path))
1485 .arg(in_path)
1486 .output()
1487 .expect("failed to compile proto");
1488 if !out.status.success() {
1489 panic!("compile proto failed \n output: {:?}", out);
1490 }
1491 out_file
1492}