1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28 AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29 DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30 FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31 RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::util::cluster_limit::ClusterLimit;
38use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
39use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
40use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
41use risingwave_pb::backup_service::MetaSnapshotMetadata;
42use risingwave_pb::catalog::{
43 PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44 PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::{PbObjectType, WorkerNode};
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
49use risingwave_pb::ddl_service::{
50 DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
51 alter_swap_rename_request, create_connection_request, streaming_job_resource_type,
52};
53use risingwave_pb::hummock::write_limits::WriteLimit;
54use risingwave_pb::hummock::{
55 BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
56};
57use risingwave_pb::id::ActorId;
58use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
59use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
60use risingwave_pb::meta::list_actor_states_response::ActorState;
61use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
62use risingwave_pb::meta::list_iceberg_compaction_status_response::IcebergCompactionStatus;
63use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
64use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
65use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
66use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
67use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
68use risingwave_pb::meta::{
69 EventLog, FragmentDistribution, ObjectDependency as PbObjectDependency, PbTableParallelism,
70 PbThrottleTarget, RecoveryStatus, RefreshRequest, RefreshResponse, SystemParams,
71 list_sink_log_store_tables_response,
72};
73use risingwave_pb::secret::PbSecretRef;
74use risingwave_pb::stream_plan::StreamFragmentGraph;
75use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
76use risingwave_pb::user::update_user_request::UpdateField;
77use risingwave_pb::user::{GrantPrivilege, UserInfo};
78use risingwave_rpc_client::error::Result as RpcResult;
79use tempfile::{Builder, NamedTempFile};
80
81use crate::FrontendOpts;
82use crate::catalog::catalog_service::CatalogWriter;
83use crate::catalog::root_catalog::Catalog;
84use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
85use crate::error::{ErrorCode, Result, RwError};
86use crate::handler::RwPgResponse;
87use crate::meta_client::FrontendMetaClient;
88use crate::scheduler::HummockSnapshotManagerRef;
89use crate::session::{AuthContext, FrontendEnv, SessionImpl};
90use crate::user::UserId;
91use crate::user::user_manager::UserInfoManager;
92use crate::user::user_service::UserInfoWriter;
93
94pub struct LocalFrontend {
96 pub opts: FrontendOpts,
97 env: FrontendEnv,
98}
99
100impl SessionManager for LocalFrontend {
101 type Error = RwError;
102 type Session = SessionImpl;
103
104 fn create_dummy_session(
105 &self,
106 _database_id: DatabaseId,
107 ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
108 unreachable!()
109 }
110
111 fn connect(
112 &self,
113 _database: &str,
114 _user_name: &str,
115 _peer_addr: AddressRef,
116 ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
117 Ok(self.session_ref())
118 }
119
120 fn cancel_queries_in_session(&self, _session_id: SessionId) {
121 unreachable!()
122 }
123
124 fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
125 unreachable!()
126 }
127
128 fn end_session(&self, _session: &Self::Session) {
129 unreachable!()
130 }
131}
132
133impl LocalFrontend {
134 #[expect(clippy::unused_async)]
135 pub async fn new(opts: FrontendOpts) -> Self {
136 let env = FrontendEnv::mock();
137 Self { opts, env }
138 }
139
140 pub async fn run_sql(
141 &self,
142 sql: impl Into<String>,
143 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
144 let sql: Arc<str> = Arc::from(sql.into());
145 Box::pin(self.session_ref().run_statement(sql, vec![])).await
146 }
147
148 pub async fn run_sql_with_session(
149 &self,
150 session_ref: Arc<SessionImpl>,
151 sql: impl Into<String>,
152 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
153 let sql: Arc<str> = Arc::from(sql.into());
154 Box::pin(session_ref.run_statement(sql, vec![])).await
155 }
156
157 pub async fn run_user_sql(
158 &self,
159 sql: impl Into<String>,
160 database: String,
161 user_name: String,
162 user_id: UserId,
163 ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
164 let sql: Arc<str> = Arc::from(sql.into());
165 Box::pin(
166 self.session_user_ref(database, user_name, user_id)
167 .run_statement(sql, vec![]),
168 )
169 .await
170 }
171
172 pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
173 let mut rsp = self.run_sql(sql).await.unwrap();
174 let mut res = vec![];
175 #[for_await]
176 for row_set in rsp.values_stream() {
177 for row in row_set.unwrap() {
178 res.push(format!("{:?}", row));
179 }
180 }
181 res
182 }
183
184 pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
185 let mut rsp = self.run_sql(sql).await.unwrap();
186 assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
187 let mut res = String::new();
188 #[for_await]
189 for row_set in rsp.values_stream() {
190 for row in row_set.unwrap() {
191 let row: Row = row;
192 let row = row.values()[0].as_ref().unwrap();
193 res += std::str::from_utf8(row).unwrap();
194 res += "\n";
195 }
196 }
197 res
198 }
199
200 pub fn session_ref(&self) -> Arc<SessionImpl> {
202 self.session_user_ref(
203 DEFAULT_DATABASE_NAME.to_owned(),
204 DEFAULT_SUPER_USER.to_owned(),
205 DEFAULT_SUPER_USER_ID,
206 )
207 }
208
209 pub fn session_user_ref(
210 &self,
211 database: String,
212 user_name: String,
213 user_id: UserId,
214 ) -> Arc<SessionImpl> {
215 Arc::new(SessionImpl::new(
216 self.env.clone(),
217 AuthContext::new(database, user_name, user_id),
218 UserAuthenticator::None,
219 (0, 0),
221 Address::Tcp(SocketAddr::new(
222 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
223 6666,
224 ))
225 .into(),
226 Default::default(),
227 ))
228 }
229}
230
231pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
232 if rsp.stmt_type() != StatementType::EXPLAIN {
233 panic!("RESPONSE INVALID: {rsp:?}");
234 }
235 let mut res = String::new();
236 #[for_await]
237 for row_set in rsp.values_stream() {
238 for row in row_set.unwrap() {
239 let row: Row = row;
240 let row = row.values()[0].as_ref().unwrap();
241 res += std::str::from_utf8(row).unwrap();
242 res += "\n";
243 }
244 }
245 res
246}
247
248pub struct MockCatalogWriter {
249 catalog: Arc<RwLock<Catalog>>,
250 id: AtomicU32,
251 table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
252 schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
253 hummock_snapshot_manager: HummockSnapshotManagerRef,
254}
255
256#[async_trait::async_trait]
257impl CatalogWriter for MockCatalogWriter {
258 async fn create_database(
259 &self,
260 db_name: &str,
261 owner: UserId,
262 resource_group: &str,
263 barrier_interval_ms: Option<u32>,
264 checkpoint_frequency: Option<u64>,
265 ) -> Result<()> {
266 let database_id = DatabaseId::new(self.gen_id());
267 self.catalog.write().create_database(&PbDatabase {
268 name: db_name.to_owned(),
269 id: database_id,
270 owner,
271 resource_group: resource_group.to_owned(),
272 barrier_interval_ms,
273 checkpoint_frequency,
274 });
275 self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
276 .await?;
277 self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
278 .await?;
279 self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
280 .await?;
281 Ok(())
282 }
283
284 async fn create_schema(
285 &self,
286 db_id: DatabaseId,
287 schema_name: &str,
288 owner: UserId,
289 ) -> Result<()> {
290 let id = self.gen_id();
291 self.catalog.write().create_schema(&PbSchema {
292 id,
293 name: schema_name.to_owned(),
294 database_id: db_id,
295 owner,
296 });
297 self.add_schema_id(id, db_id);
298 Ok(())
299 }
300
301 async fn create_materialized_view(
302 &self,
303 mut table: PbTable,
304 _graph: StreamFragmentGraph,
305 dependencies: HashSet<ObjectId>,
306 _resource_type: streaming_job_resource_type::ResourceType,
307 _if_not_exists: bool,
308 ) -> Result<()> {
309 table.id = self.gen_id();
310 table.stream_job_status = PbStreamJobStatus::Created as _;
311 table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
312 self.catalog.write().create_table(&table);
313 self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
314 self.insert_object_dependencies(table.id.as_object_id(), dependencies);
315 self.hummock_snapshot_manager.add_table_for_test(table.id);
316 Ok(())
317 }
318
319 async fn replace_materialized_view(
320 &self,
321 mut table: PbTable,
322 _graph: StreamFragmentGraph,
323 ) -> Result<()> {
324 table.stream_job_status = PbStreamJobStatus::Created as _;
325 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
326 self.catalog.write().update_table(&table);
327 Ok(())
328 }
329
330 async fn create_view(&self, mut view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
331 view.id = self.gen_id();
332 self.catalog.write().create_view(&view);
333 self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
334 self.insert_object_dependencies(view.id.as_object_id(), dependencies);
335 Ok(())
336 }
337
338 async fn create_table(
339 &self,
340 source: Option<PbSource>,
341 mut table: PbTable,
342 graph: StreamFragmentGraph,
343 _job_type: PbTableJobType,
344 if_not_exists: bool,
345 dependencies: HashSet<ObjectId>,
346 ) -> Result<()> {
347 if let Some(source) = source {
348 let source_id = self.create_source_inner(source)?;
349 table.optional_associated_source_id = Some(source_id.into());
350 }
351 self.create_materialized_view(
352 table,
353 graph,
354 dependencies,
355 streaming_job_resource_type::ResourceType::Regular(true),
356 if_not_exists,
357 )
358 .await?;
359 Ok(())
360 }
361
362 async fn replace_table(
363 &self,
364 _source: Option<PbSource>,
365 mut table: PbTable,
366 _graph: StreamFragmentGraph,
367 _job_type: TableJobType,
368 ) -> Result<()> {
369 table.stream_job_status = PbStreamJobStatus::Created as _;
370 assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
371 self.catalog.write().update_table(&table);
372 Ok(())
373 }
374
375 async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
376 self.catalog.write().update_source(&source);
377 Ok(())
378 }
379
380 async fn create_source(
381 &self,
382 source: PbSource,
383 _graph: Option<StreamFragmentGraph>,
384 _if_not_exists: bool,
385 ) -> Result<()> {
386 self.create_source_inner(source).map(|_| ())
387 }
388
389 async fn create_sink(
390 &self,
391 sink: PbSink,
392 graph: StreamFragmentGraph,
393 dependencies: HashSet<ObjectId>,
394 _if_not_exists: bool,
395 ) -> Result<()> {
396 let sink_id = self.create_sink_inner(sink, graph)?;
397 self.insert_object_dependencies(sink_id.as_object_id(), dependencies);
398 Ok(())
399 }
400
401 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
402 self.create_subscription_inner(subscription)
403 }
404
405 async fn create_index(
406 &self,
407 mut index: PbIndex,
408 mut index_table: PbTable,
409 _graph: StreamFragmentGraph,
410 _if_not_exists: bool,
411 ) -> Result<()> {
412 index_table.id = self.gen_id();
413 index_table.stream_job_status = PbStreamJobStatus::Created as _;
414 index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
415 self.catalog.write().create_table(&index_table);
416 self.add_table_or_index_id(
417 index_table.id.as_raw_id(),
418 index_table.schema_id,
419 index_table.database_id,
420 );
421
422 index.id = index_table.id.as_raw_id().into();
423 index.index_table_id = index_table.id;
424 self.catalog.write().create_index(&index);
425 Ok(())
426 }
427
428 async fn create_function(&self, _function: PbFunction) -> Result<()> {
429 unreachable!()
430 }
431
432 async fn create_connection(
433 &self,
434 _connection_name: String,
435 _database_id: DatabaseId,
436 _schema_id: SchemaId,
437 _owner_id: UserId,
438 _connection: create_connection_request::Payload,
439 ) -> Result<()> {
440 unreachable!()
441 }
442
443 async fn create_secret(
444 &self,
445 _secret_name: String,
446 _database_id: DatabaseId,
447 _schema_id: SchemaId,
448 _owner_id: UserId,
449 _payload: Vec<u8>,
450 ) -> Result<()> {
451 unreachable!()
452 }
453
454 async fn comment_on(&self, _comment: PbComment) -> Result<()> {
455 unreachable!()
456 }
457
458 async fn drop_table(
459 &self,
460 source_id: Option<SourceId>,
461 table_id: TableId,
462 cascade: bool,
463 ) -> Result<()> {
464 if cascade {
465 return Err(ErrorCode::NotSupported(
466 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
467 "use drop instead".to_owned(),
468 )
469 .into());
470 }
471 if let Some(source_id) = source_id {
472 self.drop_table_or_source_id(source_id.as_raw_id());
473 }
474 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
475 let indexes =
476 self.catalog
477 .read()
478 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
479 for index in indexes {
480 self.drop_index(index.id, cascade).await?;
481 }
482 self.catalog
483 .write()
484 .drop_table(database_id, schema_id, table_id);
485 if let Some(source_id) = source_id {
486 self.catalog
487 .write()
488 .drop_source(database_id, schema_id, source_id);
489 }
490 Ok(())
491 }
492
493 async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
494 unreachable!()
495 }
496
497 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
498 if cascade {
499 return Err(ErrorCode::NotSupported(
500 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
501 "use drop instead".to_owned(),
502 )
503 .into());
504 }
505 let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
506 let indexes =
507 self.catalog
508 .read()
509 .get_all_indexes_related_to_object(database_id, schema_id, table_id);
510 for index in indexes {
511 self.drop_index(index.id, cascade).await?;
512 }
513 self.catalog
514 .write()
515 .drop_table(database_id, schema_id, table_id);
516 Ok(())
517 }
518
519 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
520 if cascade {
521 return Err(ErrorCode::NotSupported(
522 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
523 "use drop instead".to_owned(),
524 )
525 .into());
526 }
527 let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
528 self.catalog
529 .write()
530 .drop_source(database_id, schema_id, source_id);
531 Ok(())
532 }
533
534 async fn reset_source(&self, _source_id: SourceId) -> Result<()> {
535 Ok(())
536 }
537
538 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
539 if cascade {
540 return Err(ErrorCode::NotSupported(
541 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
542 "use drop instead".to_owned(),
543 )
544 .into());
545 }
546 let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
547 self.catalog
548 .write()
549 .drop_sink(database_id, schema_id, sink_id);
550 Ok(())
551 }
552
553 async fn drop_subscription(
554 &self,
555 subscription_id: SubscriptionId,
556 cascade: bool,
557 ) -> Result<()> {
558 if cascade {
559 return Err(ErrorCode::NotSupported(
560 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
561 "use drop instead".to_owned(),
562 )
563 .into());
564 }
565 let (database_id, schema_id) =
566 self.drop_table_or_subscription_id(subscription_id.as_raw_id());
567 self.catalog
568 .write()
569 .drop_subscription(database_id, schema_id, subscription_id);
570 Ok(())
571 }
572
573 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
574 if cascade {
575 return Err(ErrorCode::NotSupported(
576 "drop cascade in MockCatalogWriter is unsupported".to_owned(),
577 "use drop instead".to_owned(),
578 )
579 .into());
580 }
581 let &schema_id = self
582 .table_id_to_schema_id
583 .read()
584 .get(&index_id.as_raw_id())
585 .unwrap();
586 let database_id = self.get_database_id_by_schema(schema_id);
587
588 let index = {
589 let catalog_reader = self.catalog.read();
590 let schema_catalog = catalog_reader
591 .get_schema_by_id(database_id, schema_id)
592 .unwrap();
593 schema_catalog.get_index_by_id(index_id).unwrap().clone()
594 };
595
596 let index_table_id = index.index_table().id;
597 let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
598 self.catalog
599 .write()
600 .drop_index(database_id, schema_id, index_id);
601 self.catalog
602 .write()
603 .drop_table(database_id, schema_id, index_table_id);
604 Ok(())
605 }
606
607 async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
608 unreachable!()
609 }
610
611 async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
612 unreachable!()
613 }
614
615 async fn drop_secret(&self, _secret_id: SecretId, _cascade: bool) -> Result<()> {
616 unreachable!()
617 }
618
619 async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
620 self.catalog.write().drop_database(database_id);
621 Ok(())
622 }
623
624 async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
625 let database_id = self.drop_schema_id(schema_id);
626 self.catalog.write().drop_schema(database_id, schema_id);
627 Ok(())
628 }
629
630 async fn alter_name(
631 &self,
632 object_id: alter_name_request::Object,
633 object_name: &str,
634 ) -> Result<()> {
635 match object_id {
636 alter_name_request::Object::TableId(table_id) => {
637 self.catalog
638 .write()
639 .alter_table_name_by_id(table_id, object_name);
640 Ok(())
641 }
642 _ => {
643 unimplemented!()
644 }
645 }
646 }
647
648 async fn alter_source(&self, source: PbSource) -> Result<()> {
649 self.catalog.write().update_source(&source);
650 Ok(())
651 }
652
653 async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<()> {
654 for database in self.catalog.read().iter_databases() {
655 for schema in database.iter_schemas() {
656 match object {
657 Object::TableId(table_id) => {
658 if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
659 {
660 let mut pb_table = table.to_prost();
661 pb_table.owner = owner_id;
662 self.catalog.write().update_table(&pb_table);
663 return Ok(());
664 }
665 }
666 _ => unreachable!(),
667 }
668 }
669 }
670
671 Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
672 }
673
674 async fn alter_subscription_retention(
675 &self,
676 subscription_id: SubscriptionId,
677 retention_seconds: u64,
678 definition: String,
679 ) -> Result<()> {
680 for database in self.catalog.read().iter_databases() {
681 for schema in database.iter_schemas() {
682 if let Some(subscription) = schema.get_subscription_by_id(subscription_id) {
683 let mut pb_subscription = subscription.to_proto();
684 pb_subscription.retention_seconds = retention_seconds;
685 pb_subscription.definition = definition;
686 self.catalog.write().update_subscription(&pb_subscription);
687 return Ok(());
688 }
689 }
690 }
691
692 Err(
693 ErrorCode::ItemNotFound(format!("subscription not found: {:?}", subscription_id))
694 .into(),
695 )
696 }
697
698 async fn alter_set_schema(
699 &self,
700 object: alter_set_schema_request::Object,
701 new_schema_id: SchemaId,
702 ) -> Result<()> {
703 match object {
704 alter_set_schema_request::Object::TableId(table_id) => {
705 let mut pb_table = {
706 let reader = self.catalog.read();
707 let table = reader.get_any_table_by_id(table_id)?.to_owned();
708 table.to_prost()
709 };
710 pb_table.schema_id = new_schema_id;
711 self.catalog.write().update_table(&pb_table);
712 self.table_id_to_schema_id
713 .write()
714 .insert(table_id.as_raw_id(), new_schema_id);
715 Ok(())
716 }
717 _ => unreachable!(),
718 }
719 }
720
721 async fn alter_parallelism(
722 &self,
723 _job_id: JobId,
724 _parallelism: PbTableParallelism,
725 _deferred: bool,
726 ) -> Result<()> {
727 todo!()
728 }
729
730 async fn alter_backfill_parallelism(
731 &self,
732 _job_id: JobId,
733 _parallelism: Option<PbTableParallelism>,
734 _deferred: bool,
735 ) -> Result<()> {
736 todo!()
737 }
738
739 async fn alter_config(
740 &self,
741 _job_id: JobId,
742 _entries_to_add: HashMap<String, String>,
743 _keys_to_remove: Vec<String>,
744 ) -> Result<()> {
745 todo!()
746 }
747
748 async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
749 todo!()
750 }
751
752 async fn alter_secret(
753 &self,
754 _secret_id: SecretId,
755 _secret_name: String,
756 _database_id: DatabaseId,
757 _schema_id: SchemaId,
758 _owner_id: UserId,
759 _payload: Vec<u8>,
760 ) -> Result<()> {
761 unreachable!()
762 }
763
764 async fn alter_resource_group(
765 &self,
766 _table_id: TableId,
767 _resource_group: Option<String>,
768 _deferred: bool,
769 ) -> Result<()> {
770 todo!()
771 }
772
773 async fn alter_database_param(
774 &self,
775 database_id: DatabaseId,
776 param: AlterDatabaseParam,
777 ) -> Result<()> {
778 let mut pb_database = {
779 let reader = self.catalog.read();
780 let database = reader.get_database_by_id(database_id)?.to_owned();
781 database.to_prost()
782 };
783 match param {
784 AlterDatabaseParam::BarrierIntervalMs(interval) => {
785 pb_database.barrier_interval_ms = interval;
786 }
787 AlterDatabaseParam::CheckpointFrequency(frequency) => {
788 pb_database.checkpoint_frequency = frequency;
789 }
790 }
791 self.catalog.write().update_database(&pb_database);
792 Ok(())
793 }
794
795 async fn create_iceberg_table(
796 &self,
797 _table_job_info: PbTableJobInfo,
798 _sink_job_info: PbSinkJobInfo,
799 _iceberg_source: PbSource,
800 _if_not_exists: bool,
801 ) -> Result<()> {
802 todo!()
803 }
804
805 async fn wait(&self) -> Result<()> {
806 Ok(())
807 }
808}
809
810impl MockCatalogWriter {
811 pub fn new(
812 catalog: Arc<RwLock<Catalog>>,
813 hummock_snapshot_manager: HummockSnapshotManagerRef,
814 ) -> Self {
815 catalog.write().create_database(&PbDatabase {
816 id: 0.into(),
817 name: DEFAULT_DATABASE_NAME.to_owned(),
818 owner: DEFAULT_SUPER_USER_ID,
819 resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
820 barrier_interval_ms: None,
821 checkpoint_frequency: None,
822 });
823 catalog.write().create_schema(&PbSchema {
824 id: 1.into(),
825 name: DEFAULT_SCHEMA_NAME.to_owned(),
826 database_id: 0.into(),
827 owner: DEFAULT_SUPER_USER_ID,
828 });
829 catalog.write().create_schema(&PbSchema {
830 id: 2.into(),
831 name: PG_CATALOG_SCHEMA_NAME.to_owned(),
832 database_id: 0.into(),
833 owner: DEFAULT_SUPER_USER_ID,
834 });
835 catalog.write().create_schema(&PbSchema {
836 id: 3.into(),
837 name: RW_CATALOG_SCHEMA_NAME.to_owned(),
838 database_id: 0.into(),
839 owner: DEFAULT_SUPER_USER_ID,
840 });
841 let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
842 map.insert(1_u32.into(), 0_u32.into());
843 map.insert(2_u32.into(), 0_u32.into());
844 map.insert(3_u32.into(), 0_u32.into());
845 Self {
846 catalog,
847 id: AtomicU32::new(3),
848 table_id_to_schema_id: Default::default(),
849 schema_id_to_database_id: RwLock::new(map),
850 hummock_snapshot_manager,
851 }
852 }
853
854 fn gen_id<T: From<u32>>(&self) -> T {
855 (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
857 }
858
859 fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
860 self.table_id_to_schema_id
861 .write()
862 .insert(table_id, schema_id);
863 }
864
865 fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
866 let schema_id = self
867 .table_id_to_schema_id
868 .write()
869 .remove(&table_id)
870 .unwrap();
871 (self.get_database_id_by_schema(schema_id), schema_id)
872 }
873
874 fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
875 self.table_id_to_schema_id
876 .write()
877 .insert(table_id, schema_id);
878 }
879
880 fn add_table_or_subscription_id(
881 &self,
882 table_id: u32,
883 schema_id: SchemaId,
884 _database_id: DatabaseId,
885 ) {
886 self.table_id_to_schema_id
887 .write()
888 .insert(table_id, schema_id);
889 }
890
891 fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
892 self.table_id_to_schema_id
893 .write()
894 .insert(table_id, schema_id);
895 }
896
897 fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
898 let schema_id = self
899 .table_id_to_schema_id
900 .write()
901 .remove(&table_id)
902 .unwrap();
903 (self.get_database_id_by_schema(schema_id), schema_id)
904 }
905
906 fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
907 let schema_id = self
908 .table_id_to_schema_id
909 .write()
910 .remove(&table_id)
911 .unwrap();
912 (self.get_database_id_by_schema(schema_id), schema_id)
913 }
914
915 fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
916 let schema_id = self
917 .table_id_to_schema_id
918 .write()
919 .remove(&table_id)
920 .unwrap();
921 (self.get_database_id_by_schema(schema_id), schema_id)
922 }
923
924 fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
925 self.schema_id_to_database_id
926 .write()
927 .insert(schema_id, database_id);
928 }
929
930 fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
931 self.schema_id_to_database_id
932 .write()
933 .remove(&schema_id)
934 .unwrap()
935 }
936
937 fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
938 source.id = self.gen_id();
939 self.catalog.write().create_source(&source);
940 self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
941 Ok(source.id)
942 }
943
944 fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<SinkId> {
945 sink.id = self.gen_id();
946 sink.stream_job_status = PbStreamJobStatus::Created as _;
947 self.catalog.write().create_sink(&sink);
948 self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
949 Ok(sink.id)
950 }
951
952 fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
953 subscription.id = self.gen_id();
954 self.catalog.write().create_subscription(&subscription);
955 self.add_table_or_subscription_id(
956 subscription.id.as_raw_id(),
957 subscription.schema_id,
958 subscription.database_id,
959 );
960 Ok(())
961 }
962
963 fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
964 *self
965 .schema_id_to_database_id
966 .read()
967 .get(&schema_id)
968 .unwrap()
969 }
970
971 fn get_object_type(&self, object_id: ObjectId) -> PbObjectType {
972 let catalog = self.catalog.read();
973 for database in catalog.iter_databases() {
974 for schema in database.iter_schemas() {
975 if let Some(table) = schema.get_created_table_by_id(object_id.as_table_id()) {
976 return if table.is_mview() {
977 PbObjectType::Mview
978 } else {
979 PbObjectType::Table
980 };
981 }
982 if schema.get_source_by_id(object_id.as_source_id()).is_some() {
983 return PbObjectType::Source;
984 }
985 if schema.get_view_by_id(object_id.as_view_id()).is_some() {
986 return PbObjectType::View;
987 }
988 if schema.get_index_by_id(object_id.as_index_id()).is_some() {
989 return PbObjectType::Index;
990 }
991 }
992 }
993 PbObjectType::Unspecified
994 }
995
996 fn insert_object_dependencies(&self, object_id: ObjectId, dependencies: HashSet<ObjectId>) {
997 if dependencies.is_empty() {
998 return;
999 }
1000 let dependencies = dependencies
1001 .into_iter()
1002 .map(|referenced_object_id| PbObjectDependency {
1003 object_id,
1004 referenced_object_id,
1005 referenced_object_type: self.get_object_type(referenced_object_id) as i32,
1006 })
1007 .collect();
1008 self.catalog
1009 .write()
1010 .insert_object_dependencies(dependencies);
1011 }
1012}
1013
1014pub struct MockUserInfoWriter {
1015 id: AtomicU32,
1016 user_info: Arc<RwLock<UserInfoManager>>,
1017}
1018
1019#[async_trait::async_trait]
1020impl UserInfoWriter for MockUserInfoWriter {
1021 async fn create_user(&self, user: UserInfo) -> Result<()> {
1022 let mut user = user;
1023 user.id = self.gen_id().into();
1024 self.user_info.write().create_user(user);
1025 Ok(())
1026 }
1027
1028 async fn drop_user(&self, id: UserId) -> Result<()> {
1029 self.user_info.write().drop_user(id);
1030 Ok(())
1031 }
1032
1033 async fn update_user(
1034 &self,
1035 update_user: UserInfo,
1036 update_fields: Vec<UpdateField>,
1037 ) -> Result<()> {
1038 let mut lock = self.user_info.write();
1039 let id = update_user.get_id();
1040 let Some(old_name) = lock.get_user_name_by_id(id) else {
1041 return Ok(());
1042 };
1043 let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
1044 update_fields.into_iter().for_each(|field| match field {
1045 UpdateField::Super => user_info.is_super = update_user.is_super,
1046 UpdateField::Login => user_info.can_login = update_user.can_login,
1047 UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
1048 UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
1049 UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
1050 UpdateField::Rename => user_info.name.clone_from(&update_user.name),
1051 UpdateField::Admin => user_info.is_admin = update_user.is_admin,
1052 UpdateField::Unspecified => unreachable!(),
1053 });
1054 lock.update_user(update_user);
1055 Ok(())
1056 }
1057
1058 async fn grant_privilege(
1061 &self,
1062 users: Vec<UserId>,
1063 privileges: Vec<GrantPrivilege>,
1064 with_grant_option: bool,
1065 _grantor: UserId,
1066 ) -> Result<()> {
1067 let privileges = privileges
1068 .into_iter()
1069 .map(|mut p| {
1070 p.action_with_opts
1071 .iter_mut()
1072 .for_each(|ao| ao.with_grant_option = with_grant_option);
1073 p
1074 })
1075 .collect::<Vec<_>>();
1076 for user_id in users {
1077 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1078 u.extend_privileges(privileges.clone());
1079 }
1080 }
1081 Ok(())
1082 }
1083
1084 async fn revoke_privilege(
1087 &self,
1088 users: Vec<UserId>,
1089 privileges: Vec<GrantPrivilege>,
1090 _granted_by: UserId,
1091 _revoke_by: UserId,
1092 revoke_grant_option: bool,
1093 _cascade: bool,
1094 ) -> Result<()> {
1095 for user_id in users {
1096 if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1097 u.revoke_privileges(privileges.clone(), revoke_grant_option);
1098 }
1099 }
1100 Ok(())
1101 }
1102
1103 async fn alter_default_privilege(
1104 &self,
1105 _users: Vec<UserId>,
1106 _database_id: DatabaseId,
1107 _schemas: Vec<SchemaId>,
1108 _operation: AlterDefaultPrivilegeOperation,
1109 _operated_by: UserId,
1110 ) -> Result<()> {
1111 todo!()
1112 }
1113}
1114
1115impl MockUserInfoWriter {
1116 pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1117 user_info.write().create_user(UserInfo {
1118 id: DEFAULT_SUPER_USER_ID,
1119 name: DEFAULT_SUPER_USER.to_owned(),
1120 is_super: true,
1121 can_create_db: true,
1122 can_create_user: true,
1123 can_login: true,
1124 ..Default::default()
1125 });
1126 user_info.write().create_user(UserInfo {
1127 id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1128 name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1129 is_super: true,
1130 can_create_db: true,
1131 can_create_user: true,
1132 can_login: true,
1133 is_admin: true,
1134 ..Default::default()
1135 });
1136 Self {
1137 user_info,
1138 id: AtomicU32::new(NON_RESERVED_USER_ID.as_raw_id()),
1139 }
1140 }
1141
1142 fn gen_id(&self) -> u32 {
1143 self.id.fetch_add(1, Ordering::SeqCst)
1144 }
1145}
1146
1147pub struct MockFrontendMetaClient {}
1148
1149#[async_trait::async_trait]
1150impl FrontendMetaClient for MockFrontendMetaClient {
1151 async fn try_unregister(&self) {}
1152
1153 async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1154 Ok(INVALID_VERSION_ID)
1155 }
1156
1157 async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1158 Ok(vec![])
1159 }
1160
1161 async fn list_table_fragments(
1162 &self,
1163 _table_ids: &[JobId],
1164 ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1165 Ok(HashMap::default())
1166 }
1167
1168 async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1169 Ok(vec![])
1170 }
1171
1172 async fn list_fragment_distribution(
1173 &self,
1174 _include_node: bool,
1175 ) -> RpcResult<Vec<FragmentDistribution>> {
1176 Ok(vec![])
1177 }
1178
1179 async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1180 Ok(vec![])
1181 }
1182
1183 async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1184 Ok(vec![])
1185 }
1186
1187 async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1188 Ok(vec![])
1189 }
1190
1191 async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1192 Ok(vec![])
1193 }
1194
1195 async fn list_sink_log_store_tables(
1196 &self,
1197 ) -> RpcResult<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1198 Ok(vec![])
1199 }
1200
1201 async fn set_system_param(
1202 &self,
1203 _param: String,
1204 _value: Option<String>,
1205 ) -> RpcResult<Option<SystemParamsReader>> {
1206 Ok(Some(SystemParams::default().into()))
1207 }
1208
1209 async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1210 Ok(Default::default())
1211 }
1212
1213 async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1214 Ok("".to_owned())
1215 }
1216
1217 async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1218 Ok(vec![])
1219 }
1220
1221 async fn get_tables(
1222 &self,
1223 _table_ids: Vec<crate::catalog::TableId>,
1224 _include_dropped_tables: bool,
1225 ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1226 Ok(HashMap::new())
1227 }
1228
1229 async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, HummockVersionId)>> {
1230 unimplemented!()
1231 }
1232
1233 async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1234 unimplemented!()
1235 }
1236
1237 async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1238 Ok(HummockVersion::default())
1239 }
1240
1241 async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1242 unimplemented!()
1243 }
1244
1245 async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1246 unimplemented!()
1247 }
1248
1249 async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1250 unimplemented!()
1251 }
1252
1253 async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1254 unimplemented!()
1255 }
1256
1257 async fn list_hummock_active_write_limits(
1258 &self,
1259 ) -> RpcResult<HashMap<CompactionGroupId, WriteLimit>> {
1260 unimplemented!()
1261 }
1262
1263 async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1264 unimplemented!()
1265 }
1266
1267 async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1268 Ok(vec![])
1269 }
1270
1271 async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1272 unimplemented!()
1273 }
1274
1275 async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1276 Ok(vec![])
1277 }
1278
1279 async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1280 unimplemented!()
1281 }
1282
1283 async fn recover(&self) -> RpcResult<()> {
1284 unimplemented!()
1285 }
1286
1287 async fn apply_throttle(
1288 &self,
1289 _throttle_target: PbThrottleTarget,
1290 _throttle_type: risingwave_pb::common::PbThrottleType,
1291 _id: u32,
1292 _rate_limit: Option<u32>,
1293 ) -> RpcResult<()> {
1294 unimplemented!()
1295 }
1296
1297 async fn alter_fragment_parallelism(
1298 &self,
1299 _fragment_ids: Vec<FragmentId>,
1300 _parallelism: Option<PbTableParallelism>,
1301 ) -> RpcResult<()> {
1302 unimplemented!()
1303 }
1304
1305 async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1306 Ok(RecoveryStatus::StatusRunning)
1307 }
1308
1309 async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1310 Ok(vec![])
1311 }
1312
1313 async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1314 Ok(vec![])
1315 }
1316
1317 async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1318 Ok(HashMap::default())
1319 }
1320
1321 async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1322 unimplemented!()
1323 }
1324
1325 async fn alter_sink_props(
1326 &self,
1327 _sink_id: SinkId,
1328 _changed_props: BTreeMap<String, String>,
1329 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1330 _connector_conn_ref: Option<ConnectionId>,
1331 ) -> RpcResult<()> {
1332 unimplemented!()
1333 }
1334
1335 async fn alter_iceberg_table_props(
1336 &self,
1337 _table_id: TableId,
1338 _sink_id: SinkId,
1339 _source_id: SourceId,
1340 _changed_props: BTreeMap<String, String>,
1341 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1342 _connector_conn_ref: Option<ConnectionId>,
1343 ) -> RpcResult<()> {
1344 unimplemented!()
1345 }
1346
1347 async fn alter_source_connector_props(
1348 &self,
1349 _source_id: SourceId,
1350 _changed_props: BTreeMap<String, String>,
1351 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1352 _connector_conn_ref: Option<ConnectionId>,
1353 ) -> RpcResult<()> {
1354 unimplemented!()
1355 }
1356
1357 async fn alter_connection_connector_props(
1358 &self,
1359 _connection_id: u32,
1360 _changed_props: BTreeMap<String, String>,
1361 _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1362 ) -> RpcResult<()> {
1363 Ok(())
1364 }
1365
1366 async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1367 unimplemented!()
1368 }
1369
1370 async fn list_iceberg_compaction_status(&self) -> RpcResult<Vec<IcebergCompactionStatus>> {
1371 Ok(vec![])
1372 }
1373
1374 async fn get_fragment_by_id(
1375 &self,
1376 _fragment_id: FragmentId,
1377 ) -> RpcResult<Option<FragmentDistribution>> {
1378 unimplemented!()
1379 }
1380
1381 async fn get_fragment_vnodes(
1382 &self,
1383 _fragment_id: FragmentId,
1384 ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1385 unimplemented!()
1386 }
1387
1388 async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1389 unimplemented!()
1390 }
1391
1392 fn worker_id(&self) -> WorkerId {
1393 0.into()
1394 }
1395
1396 async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1397 Ok(())
1398 }
1399
1400 async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1401 Ok(1)
1402 }
1403
1404 async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1405 Ok(())
1406 }
1407
1408 async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1409 Ok(RefreshResponse { status: None })
1410 }
1411
1412 fn cluster_id(&self) -> &str {
1413 "test-cluster-uuid"
1414 }
1415
1416 async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1417 unimplemented!()
1418 }
1419}
1420
1421#[cfg(test)]
1422pub static PROTO_FILE_DATA: &str = r#"
1423 syntax = "proto3";
1424 package test;
1425 message TestRecord {
1426 int32 id = 1;
1427 Country country = 3;
1428 int64 zipcode = 4;
1429 float rate = 5;
1430 }
1431 message TestRecordAlterType {
1432 string id = 1;
1433 Country country = 3;
1434 int32 zipcode = 4;
1435 float rate = 5;
1436 }
1437 message TestRecordExt {
1438 int32 id = 1;
1439 Country country = 3;
1440 int64 zipcode = 4;
1441 float rate = 5;
1442 string name = 6;
1443 }
1444 message Country {
1445 string address = 1;
1446 City city = 2;
1447 string zipcode = 3;
1448 }
1449 message City {
1450 string address = 1;
1451 string zipcode = 2;
1452 }"#;
1453
1454pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1457 let in_file = Builder::new()
1458 .prefix("temp")
1459 .suffix(".proto")
1460 .rand_bytes(8)
1461 .tempfile()
1462 .unwrap();
1463
1464 let out_file = Builder::new()
1465 .prefix("temp")
1466 .suffix(".pb")
1467 .rand_bytes(8)
1468 .tempfile()
1469 .unwrap();
1470
1471 let mut file = in_file.as_file();
1472 file.write_all(proto_data.as_ref())
1473 .expect("writing binary to test file");
1474 file.flush().expect("flush temp file failed");
1475 let include_path = in_file
1476 .path()
1477 .parent()
1478 .unwrap()
1479 .to_string_lossy()
1480 .into_owned();
1481 let out_path = out_file.path().to_string_lossy().into_owned();
1482 let in_path = in_file.path().to_string_lossy().into_owned();
1483 let mut compile = std::process::Command::new("protoc");
1484
1485 let out = compile
1486 .arg("--include_imports")
1487 .arg("-I")
1488 .arg(include_path)
1489 .arg(format!("--descriptor_set_out={}", out_path))
1490 .arg(in_path)
1491 .output()
1492 .expect("failed to compile proto");
1493 if !out.status.success() {
1494 panic!("compile proto failed \n output: {:?}", out);
1495 }
1496 out_file
1497}