risingwave_frontend/
test_utils.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29    DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30    FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31    RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::util::cluster_limit::ClusterLimit;
38use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
39use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
40use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
41use risingwave_pb::backup_service::MetaSnapshotMetadata;
42use risingwave_pb::catalog::{
43    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44    PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::WorkerNode;
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
49use risingwave_pb::ddl_service::{
50    DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
51    alter_swap_rename_request, create_connection_request, streaming_job_resource_type,
52};
53use risingwave_pb::hummock::write_limits::WriteLimit;
54use risingwave_pb::hummock::{
55    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
56};
57use risingwave_pb::id::ActorId;
58use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
59use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
60use risingwave_pb::meta::list_actor_states_response::ActorState;
61use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
62use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
63use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
64use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
65use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
66use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
67use risingwave_pb::meta::{
68    EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
69    RefreshRequest, RefreshResponse, SystemParams, list_sink_log_store_tables_response,
70};
71use risingwave_pb::secret::PbSecretRef;
72use risingwave_pb::stream_plan::StreamFragmentGraph;
73use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
74use risingwave_pb::user::update_user_request::UpdateField;
75use risingwave_pb::user::{GrantPrivilege, UserInfo};
76use risingwave_rpc_client::error::Result as RpcResult;
77use tempfile::{Builder, NamedTempFile};
78
79use crate::FrontendOpts;
80use crate::catalog::catalog_service::CatalogWriter;
81use crate::catalog::root_catalog::Catalog;
82use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
83use crate::error::{ErrorCode, Result, RwError};
84use crate::handler::RwPgResponse;
85use crate::meta_client::FrontendMetaClient;
86use crate::scheduler::HummockSnapshotManagerRef;
87use crate::session::{AuthContext, FrontendEnv, SessionImpl};
88use crate::user::UserId;
89use crate::user::user_manager::UserInfoManager;
90use crate::user::user_service::UserInfoWriter;
91
92/// An embedded frontend without starting meta and without starting frontend as a tcp server.
93pub struct LocalFrontend {
94    pub opts: FrontendOpts,
95    env: FrontendEnv,
96}
97
98impl SessionManager for LocalFrontend {
99    type Error = RwError;
100    type Session = SessionImpl;
101
102    fn create_dummy_session(
103        &self,
104        _database_id: DatabaseId,
105    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
106        unreachable!()
107    }
108
109    fn connect(
110        &self,
111        _database: &str,
112        _user_name: &str,
113        _peer_addr: AddressRef,
114    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
115        Ok(self.session_ref())
116    }
117
118    fn cancel_queries_in_session(&self, _session_id: SessionId) {
119        unreachable!()
120    }
121
122    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
123        unreachable!()
124    }
125
126    fn end_session(&self, _session: &Self::Session) {
127        unreachable!()
128    }
129}
130
131impl LocalFrontend {
132    #[expect(clippy::unused_async)]
133    pub async fn new(opts: FrontendOpts) -> Self {
134        let env = FrontendEnv::mock();
135        Self { opts, env }
136    }
137
138    pub async fn run_sql(
139        &self,
140        sql: impl Into<String>,
141    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
142        let sql: Arc<str> = Arc::from(sql.into());
143        self.session_ref().run_statement(sql, vec![]).await
144    }
145
146    pub async fn run_sql_with_session(
147        &self,
148        session_ref: Arc<SessionImpl>,
149        sql: impl Into<String>,
150    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
151        let sql: Arc<str> = Arc::from(sql.into());
152        session_ref.run_statement(sql, vec![]).await
153    }
154
155    pub async fn run_user_sql(
156        &self,
157        sql: impl Into<String>,
158        database: String,
159        user_name: String,
160        user_id: UserId,
161    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
162        let sql: Arc<str> = Arc::from(sql.into());
163        self.session_user_ref(database, user_name, user_id)
164            .run_statement(sql, vec![])
165            .await
166    }
167
168    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
169        let mut rsp = self.run_sql(sql).await.unwrap();
170        let mut res = vec![];
171        #[for_await]
172        for row_set in rsp.values_stream() {
173            for row in row_set.unwrap() {
174                res.push(format!("{:?}", row));
175            }
176        }
177        res
178    }
179
180    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
181        let mut rsp = self.run_sql(sql).await.unwrap();
182        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
183        let mut res = String::new();
184        #[for_await]
185        for row_set in rsp.values_stream() {
186            for row in row_set.unwrap() {
187                let row: Row = row;
188                let row = row.values()[0].as_ref().unwrap();
189                res += std::str::from_utf8(row).unwrap();
190                res += "\n";
191            }
192        }
193        res
194    }
195
196    /// Creates a new session
197    pub fn session_ref(&self) -> Arc<SessionImpl> {
198        self.session_user_ref(
199            DEFAULT_DATABASE_NAME.to_owned(),
200            DEFAULT_SUPER_USER.to_owned(),
201            DEFAULT_SUPER_USER_ID,
202        )
203    }
204
205    pub fn session_user_ref(
206        &self,
207        database: String,
208        user_name: String,
209        user_id: UserId,
210    ) -> Arc<SessionImpl> {
211        Arc::new(SessionImpl::new(
212            self.env.clone(),
213            AuthContext::new(database, user_name, user_id),
214            UserAuthenticator::None,
215            // Local Frontend use a non-sense id.
216            (0, 0),
217            Address::Tcp(SocketAddr::new(
218                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
219                6666,
220            ))
221            .into(),
222            Default::default(),
223        ))
224    }
225}
226
227pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
228    if rsp.stmt_type() != StatementType::EXPLAIN {
229        panic!("RESPONSE INVALID: {rsp:?}");
230    }
231    let mut res = String::new();
232    #[for_await]
233    for row_set in rsp.values_stream() {
234        for row in row_set.unwrap() {
235            let row: Row = row;
236            let row = row.values()[0].as_ref().unwrap();
237            res += std::str::from_utf8(row).unwrap();
238            res += "\n";
239        }
240    }
241    res
242}
243
244pub struct MockCatalogWriter {
245    catalog: Arc<RwLock<Catalog>>,
246    id: AtomicU32,
247    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
248    schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
249    hummock_snapshot_manager: HummockSnapshotManagerRef,
250}
251
252#[async_trait::async_trait]
253impl CatalogWriter for MockCatalogWriter {
254    async fn create_database(
255        &self,
256        db_name: &str,
257        owner: UserId,
258        resource_group: &str,
259        barrier_interval_ms: Option<u32>,
260        checkpoint_frequency: Option<u64>,
261    ) -> Result<()> {
262        let database_id = DatabaseId::new(self.gen_id());
263        self.catalog.write().create_database(&PbDatabase {
264            name: db_name.to_owned(),
265            id: database_id,
266            owner,
267            resource_group: resource_group.to_owned(),
268            barrier_interval_ms,
269            checkpoint_frequency,
270        });
271        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
272            .await?;
273        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
274            .await?;
275        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
276            .await?;
277        Ok(())
278    }
279
280    async fn create_schema(
281        &self,
282        db_id: DatabaseId,
283        schema_name: &str,
284        owner: UserId,
285    ) -> Result<()> {
286        let id = self.gen_id();
287        self.catalog.write().create_schema(&PbSchema {
288            id,
289            name: schema_name.to_owned(),
290            database_id: db_id,
291            owner,
292        });
293        self.add_schema_id(id, db_id);
294        Ok(())
295    }
296
297    async fn create_materialized_view(
298        &self,
299        mut table: PbTable,
300        _graph: StreamFragmentGraph,
301        _dependencies: HashSet<ObjectId>,
302        _resource_type: streaming_job_resource_type::ResourceType,
303        _if_not_exists: bool,
304    ) -> Result<()> {
305        table.id = self.gen_id();
306        table.stream_job_status = PbStreamJobStatus::Created as _;
307        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
308        self.catalog.write().create_table(&table);
309        self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
310        self.hummock_snapshot_manager.add_table_for_test(table.id);
311        Ok(())
312    }
313
314    async fn replace_materialized_view(
315        &self,
316        mut table: PbTable,
317        _graph: StreamFragmentGraph,
318    ) -> Result<()> {
319        table.stream_job_status = PbStreamJobStatus::Created as _;
320        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
321        self.catalog.write().update_table(&table);
322        Ok(())
323    }
324
325    async fn create_view(&self, mut view: PbView, _dependencies: HashSet<ObjectId>) -> Result<()> {
326        view.id = self.gen_id();
327        self.catalog.write().create_view(&view);
328        self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
329        Ok(())
330    }
331
332    async fn create_table(
333        &self,
334        source: Option<PbSource>,
335        mut table: PbTable,
336        graph: StreamFragmentGraph,
337        _job_type: PbTableJobType,
338        if_not_exists: bool,
339        _dependencies: HashSet<ObjectId>,
340    ) -> Result<()> {
341        if let Some(source) = source {
342            let source_id = self.create_source_inner(source)?;
343            table.optional_associated_source_id = Some(source_id.into());
344        }
345        self.create_materialized_view(
346            table,
347            graph,
348            HashSet::new(),
349            streaming_job_resource_type::ResourceType::Regular(true),
350            if_not_exists,
351        )
352        .await?;
353        Ok(())
354    }
355
356    async fn replace_table(
357        &self,
358        _source: Option<PbSource>,
359        mut table: PbTable,
360        _graph: StreamFragmentGraph,
361        _job_type: TableJobType,
362    ) -> Result<()> {
363        table.stream_job_status = PbStreamJobStatus::Created as _;
364        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
365        self.catalog.write().update_table(&table);
366        Ok(())
367    }
368
369    async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
370        self.catalog.write().update_source(&source);
371        Ok(())
372    }
373
374    async fn create_source(
375        &self,
376        source: PbSource,
377        _graph: Option<StreamFragmentGraph>,
378        _if_not_exists: bool,
379    ) -> Result<()> {
380        self.create_source_inner(source).map(|_| ())
381    }
382
383    async fn create_sink(
384        &self,
385        sink: PbSink,
386        graph: StreamFragmentGraph,
387        _dependencies: HashSet<ObjectId>,
388        _if_not_exists: bool,
389    ) -> Result<()> {
390        self.create_sink_inner(sink, graph)
391    }
392
393    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
394        self.create_subscription_inner(subscription)
395    }
396
397    async fn create_index(
398        &self,
399        mut index: PbIndex,
400        mut index_table: PbTable,
401        _graph: StreamFragmentGraph,
402        _if_not_exists: bool,
403    ) -> Result<()> {
404        index_table.id = self.gen_id();
405        index_table.stream_job_status = PbStreamJobStatus::Created as _;
406        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
407        self.catalog.write().create_table(&index_table);
408        self.add_table_or_index_id(
409            index_table.id.as_raw_id(),
410            index_table.schema_id,
411            index_table.database_id,
412        );
413
414        index.id = index_table.id.as_raw_id().into();
415        index.index_table_id = index_table.id;
416        self.catalog.write().create_index(&index);
417        Ok(())
418    }
419
420    async fn create_function(&self, _function: PbFunction) -> Result<()> {
421        unreachable!()
422    }
423
424    async fn create_connection(
425        &self,
426        _connection_name: String,
427        _database_id: DatabaseId,
428        _schema_id: SchemaId,
429        _owner_id: UserId,
430        _connection: create_connection_request::Payload,
431    ) -> Result<()> {
432        unreachable!()
433    }
434
435    async fn create_secret(
436        &self,
437        _secret_name: String,
438        _database_id: DatabaseId,
439        _schema_id: SchemaId,
440        _owner_id: UserId,
441        _payload: Vec<u8>,
442    ) -> Result<()> {
443        unreachable!()
444    }
445
446    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
447        unreachable!()
448    }
449
450    async fn drop_table(
451        &self,
452        source_id: Option<SourceId>,
453        table_id: TableId,
454        cascade: bool,
455    ) -> Result<()> {
456        if cascade {
457            return Err(ErrorCode::NotSupported(
458                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
459                "use drop instead".to_owned(),
460            )
461            .into());
462        }
463        if let Some(source_id) = source_id {
464            self.drop_table_or_source_id(source_id.as_raw_id());
465        }
466        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
467        let indexes =
468            self.catalog
469                .read()
470                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
471        for index in indexes {
472            self.drop_index(index.id, cascade).await?;
473        }
474        self.catalog
475            .write()
476            .drop_table(database_id, schema_id, table_id);
477        if let Some(source_id) = source_id {
478            self.catalog
479                .write()
480                .drop_source(database_id, schema_id, source_id);
481        }
482        Ok(())
483    }
484
485    async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
486        unreachable!()
487    }
488
489    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
490        if cascade {
491            return Err(ErrorCode::NotSupported(
492                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
493                "use drop instead".to_owned(),
494            )
495            .into());
496        }
497        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
498        let indexes =
499            self.catalog
500                .read()
501                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
502        for index in indexes {
503            self.drop_index(index.id, cascade).await?;
504        }
505        self.catalog
506            .write()
507            .drop_table(database_id, schema_id, table_id);
508        Ok(())
509    }
510
511    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
512        if cascade {
513            return Err(ErrorCode::NotSupported(
514                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
515                "use drop instead".to_owned(),
516            )
517            .into());
518        }
519        let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
520        self.catalog
521            .write()
522            .drop_source(database_id, schema_id, source_id);
523        Ok(())
524    }
525
526    async fn reset_source(&self, _source_id: SourceId) -> Result<()> {
527        Ok(())
528    }
529
530    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
531        if cascade {
532            return Err(ErrorCode::NotSupported(
533                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
534                "use drop instead".to_owned(),
535            )
536            .into());
537        }
538        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
539        self.catalog
540            .write()
541            .drop_sink(database_id, schema_id, sink_id);
542        Ok(())
543    }
544
545    async fn drop_subscription(
546        &self,
547        subscription_id: SubscriptionId,
548        cascade: bool,
549    ) -> Result<()> {
550        if cascade {
551            return Err(ErrorCode::NotSupported(
552                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
553                "use drop instead".to_owned(),
554            )
555            .into());
556        }
557        let (database_id, schema_id) =
558            self.drop_table_or_subscription_id(subscription_id.as_raw_id());
559        self.catalog
560            .write()
561            .drop_subscription(database_id, schema_id, subscription_id);
562        Ok(())
563    }
564
565    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
566        if cascade {
567            return Err(ErrorCode::NotSupported(
568                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
569                "use drop instead".to_owned(),
570            )
571            .into());
572        }
573        let &schema_id = self
574            .table_id_to_schema_id
575            .read()
576            .get(&index_id.as_raw_id())
577            .unwrap();
578        let database_id = self.get_database_id_by_schema(schema_id);
579
580        let index = {
581            let catalog_reader = self.catalog.read();
582            let schema_catalog = catalog_reader
583                .get_schema_by_id(database_id, schema_id)
584                .unwrap();
585            schema_catalog.get_index_by_id(index_id).unwrap().clone()
586        };
587
588        let index_table_id = index.index_table().id;
589        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
590        self.catalog
591            .write()
592            .drop_index(database_id, schema_id, index_id);
593        self.catalog
594            .write()
595            .drop_table(database_id, schema_id, index_table_id);
596        Ok(())
597    }
598
599    async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
600        unreachable!()
601    }
602
603    async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
604        unreachable!()
605    }
606
607    async fn drop_secret(&self, _secret_id: SecretId, _cascade: bool) -> Result<()> {
608        unreachable!()
609    }
610
611    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
612        self.catalog.write().drop_database(database_id);
613        Ok(())
614    }
615
616    async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
617        let database_id = self.drop_schema_id(schema_id);
618        self.catalog.write().drop_schema(database_id, schema_id);
619        Ok(())
620    }
621
622    async fn alter_name(
623        &self,
624        object_id: alter_name_request::Object,
625        object_name: &str,
626    ) -> Result<()> {
627        match object_id {
628            alter_name_request::Object::TableId(table_id) => {
629                self.catalog
630                    .write()
631                    .alter_table_name_by_id(table_id, object_name);
632                Ok(())
633            }
634            _ => {
635                unimplemented!()
636            }
637        }
638    }
639
640    async fn alter_source(&self, source: PbSource) -> Result<()> {
641        self.catalog.write().update_source(&source);
642        Ok(())
643    }
644
645    async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<()> {
646        for database in self.catalog.read().iter_databases() {
647            for schema in database.iter_schemas() {
648                match object {
649                    Object::TableId(table_id) => {
650                        if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
651                        {
652                            let mut pb_table = table.to_prost();
653                            pb_table.owner = owner_id;
654                            self.catalog.write().update_table(&pb_table);
655                            return Ok(());
656                        }
657                    }
658                    _ => unreachable!(),
659                }
660            }
661        }
662
663        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
664    }
665
666    async fn alter_set_schema(
667        &self,
668        object: alter_set_schema_request::Object,
669        new_schema_id: SchemaId,
670    ) -> Result<()> {
671        match object {
672            alter_set_schema_request::Object::TableId(table_id) => {
673                let mut pb_table = {
674                    let reader = self.catalog.read();
675                    let table = reader.get_any_table_by_id(table_id)?.to_owned();
676                    table.to_prost()
677                };
678                pb_table.schema_id = new_schema_id;
679                self.catalog.write().update_table(&pb_table);
680                self.table_id_to_schema_id
681                    .write()
682                    .insert(table_id.as_raw_id(), new_schema_id);
683                Ok(())
684            }
685            _ => unreachable!(),
686        }
687    }
688
689    async fn alter_parallelism(
690        &self,
691        _job_id: JobId,
692        _parallelism: PbTableParallelism,
693        _deferred: bool,
694    ) -> Result<()> {
695        todo!()
696    }
697
698    async fn alter_config(
699        &self,
700        _job_id: JobId,
701        _entries_to_add: HashMap<String, String>,
702        _keys_to_remove: Vec<String>,
703    ) -> Result<()> {
704        todo!()
705    }
706
707    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
708        todo!()
709    }
710
711    async fn alter_secret(
712        &self,
713        _secret_id: SecretId,
714        _secret_name: String,
715        _database_id: DatabaseId,
716        _schema_id: SchemaId,
717        _owner_id: UserId,
718        _payload: Vec<u8>,
719    ) -> Result<()> {
720        unreachable!()
721    }
722
723    async fn alter_resource_group(
724        &self,
725        _table_id: TableId,
726        _resource_group: Option<String>,
727        _deferred: bool,
728    ) -> Result<()> {
729        todo!()
730    }
731
732    async fn alter_database_param(
733        &self,
734        database_id: DatabaseId,
735        param: AlterDatabaseParam,
736    ) -> Result<()> {
737        let mut pb_database = {
738            let reader = self.catalog.read();
739            let database = reader.get_database_by_id(database_id)?.to_owned();
740            database.to_prost()
741        };
742        match param {
743            AlterDatabaseParam::BarrierIntervalMs(interval) => {
744                pb_database.barrier_interval_ms = interval;
745            }
746            AlterDatabaseParam::CheckpointFrequency(frequency) => {
747                pb_database.checkpoint_frequency = frequency;
748            }
749        }
750        self.catalog.write().update_database(&pb_database);
751        Ok(())
752    }
753
754    async fn create_iceberg_table(
755        &self,
756        _table_job_info: PbTableJobInfo,
757        _sink_job_info: PbSinkJobInfo,
758        _iceberg_source: PbSource,
759        _if_not_exists: bool,
760    ) -> Result<()> {
761        todo!()
762    }
763
764    async fn wait(&self) -> Result<()> {
765        Ok(())
766    }
767}
768
769impl MockCatalogWriter {
770    pub fn new(
771        catalog: Arc<RwLock<Catalog>>,
772        hummock_snapshot_manager: HummockSnapshotManagerRef,
773    ) -> Self {
774        catalog.write().create_database(&PbDatabase {
775            id: 0.into(),
776            name: DEFAULT_DATABASE_NAME.to_owned(),
777            owner: DEFAULT_SUPER_USER_ID,
778            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
779            barrier_interval_ms: None,
780            checkpoint_frequency: None,
781        });
782        catalog.write().create_schema(&PbSchema {
783            id: 1.into(),
784            name: DEFAULT_SCHEMA_NAME.to_owned(),
785            database_id: 0.into(),
786            owner: DEFAULT_SUPER_USER_ID,
787        });
788        catalog.write().create_schema(&PbSchema {
789            id: 2.into(),
790            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
791            database_id: 0.into(),
792            owner: DEFAULT_SUPER_USER_ID,
793        });
794        catalog.write().create_schema(&PbSchema {
795            id: 3.into(),
796            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
797            database_id: 0.into(),
798            owner: DEFAULT_SUPER_USER_ID,
799        });
800        let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
801        map.insert(1_u32.into(), 0_u32.into());
802        map.insert(2_u32.into(), 0_u32.into());
803        map.insert(3_u32.into(), 0_u32.into());
804        Self {
805            catalog,
806            id: AtomicU32::new(3),
807            table_id_to_schema_id: Default::default(),
808            schema_id_to_database_id: RwLock::new(map),
809            hummock_snapshot_manager,
810        }
811    }
812
813    fn gen_id<T: From<u32>>(&self) -> T {
814        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
815        (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
816    }
817
818    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
819        self.table_id_to_schema_id
820            .write()
821            .insert(table_id, schema_id);
822    }
823
824    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
825        let schema_id = self
826            .table_id_to_schema_id
827            .write()
828            .remove(&table_id)
829            .unwrap();
830        (self.get_database_id_by_schema(schema_id), schema_id)
831    }
832
833    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
834        self.table_id_to_schema_id
835            .write()
836            .insert(table_id, schema_id);
837    }
838
839    fn add_table_or_subscription_id(
840        &self,
841        table_id: u32,
842        schema_id: SchemaId,
843        _database_id: DatabaseId,
844    ) {
845        self.table_id_to_schema_id
846            .write()
847            .insert(table_id, schema_id);
848    }
849
850    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
851        self.table_id_to_schema_id
852            .write()
853            .insert(table_id, schema_id);
854    }
855
856    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
857        let schema_id = self
858            .table_id_to_schema_id
859            .write()
860            .remove(&table_id)
861            .unwrap();
862        (self.get_database_id_by_schema(schema_id), schema_id)
863    }
864
865    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
866        let schema_id = self
867            .table_id_to_schema_id
868            .write()
869            .remove(&table_id)
870            .unwrap();
871        (self.get_database_id_by_schema(schema_id), schema_id)
872    }
873
874    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
875        let schema_id = self
876            .table_id_to_schema_id
877            .write()
878            .remove(&table_id)
879            .unwrap();
880        (self.get_database_id_by_schema(schema_id), schema_id)
881    }
882
883    fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
884        self.schema_id_to_database_id
885            .write()
886            .insert(schema_id, database_id);
887    }
888
889    fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
890        self.schema_id_to_database_id
891            .write()
892            .remove(&schema_id)
893            .unwrap()
894    }
895
896    fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
897        source.id = self.gen_id();
898        self.catalog.write().create_source(&source);
899        self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
900        Ok(source.id)
901    }
902
903    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
904        sink.id = self.gen_id();
905        sink.stream_job_status = PbStreamJobStatus::Created as _;
906        self.catalog.write().create_sink(&sink);
907        self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
908        Ok(())
909    }
910
911    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
912        subscription.id = self.gen_id();
913        self.catalog.write().create_subscription(&subscription);
914        self.add_table_or_subscription_id(
915            subscription.id.as_raw_id(),
916            subscription.schema_id,
917            subscription.database_id,
918        );
919        Ok(())
920    }
921
922    fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
923        *self
924            .schema_id_to_database_id
925            .read()
926            .get(&schema_id)
927            .unwrap()
928    }
929}
930
931pub struct MockUserInfoWriter {
932    id: AtomicU32,
933    user_info: Arc<RwLock<UserInfoManager>>,
934}
935
936#[async_trait::async_trait]
937impl UserInfoWriter for MockUserInfoWriter {
938    async fn create_user(&self, user: UserInfo) -> Result<()> {
939        let mut user = user;
940        user.id = self.gen_id().into();
941        self.user_info.write().create_user(user);
942        Ok(())
943    }
944
945    async fn drop_user(&self, id: UserId) -> Result<()> {
946        self.user_info.write().drop_user(id);
947        Ok(())
948    }
949
950    async fn update_user(
951        &self,
952        update_user: UserInfo,
953        update_fields: Vec<UpdateField>,
954    ) -> Result<()> {
955        let mut lock = self.user_info.write();
956        let id = update_user.get_id();
957        let Some(old_name) = lock.get_user_name_by_id(id) else {
958            return Ok(());
959        };
960        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
961        update_fields.into_iter().for_each(|field| match field {
962            UpdateField::Super => user_info.is_super = update_user.is_super,
963            UpdateField::Login => user_info.can_login = update_user.can_login,
964            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
965            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
966            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
967            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
968            UpdateField::Admin => user_info.is_admin = update_user.is_admin,
969            UpdateField::Unspecified => unreachable!(),
970        });
971        lock.update_user(update_user);
972        Ok(())
973    }
974
975    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
976    /// `GrantAllSources` when grant privilege to user.
977    async fn grant_privilege(
978        &self,
979        users: Vec<UserId>,
980        privileges: Vec<GrantPrivilege>,
981        with_grant_option: bool,
982        _grantor: UserId,
983    ) -> Result<()> {
984        let privileges = privileges
985            .into_iter()
986            .map(|mut p| {
987                p.action_with_opts
988                    .iter_mut()
989                    .for_each(|ao| ao.with_grant_option = with_grant_option);
990                p
991            })
992            .collect::<Vec<_>>();
993        for user_id in users {
994            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
995                u.extend_privileges(privileges.clone());
996            }
997        }
998        Ok(())
999    }
1000
1001    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
1002    /// `RevokeAllSources` when revoke privilege from user.
1003    async fn revoke_privilege(
1004        &self,
1005        users: Vec<UserId>,
1006        privileges: Vec<GrantPrivilege>,
1007        _granted_by: UserId,
1008        _revoke_by: UserId,
1009        revoke_grant_option: bool,
1010        _cascade: bool,
1011    ) -> Result<()> {
1012        for user_id in users {
1013            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1014                u.revoke_privileges(privileges.clone(), revoke_grant_option);
1015            }
1016        }
1017        Ok(())
1018    }
1019
1020    async fn alter_default_privilege(
1021        &self,
1022        _users: Vec<UserId>,
1023        _database_id: DatabaseId,
1024        _schemas: Vec<SchemaId>,
1025        _operation: AlterDefaultPrivilegeOperation,
1026        _operated_by: UserId,
1027    ) -> Result<()> {
1028        todo!()
1029    }
1030}
1031
1032impl MockUserInfoWriter {
1033    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1034        user_info.write().create_user(UserInfo {
1035            id: DEFAULT_SUPER_USER_ID,
1036            name: DEFAULT_SUPER_USER.to_owned(),
1037            is_super: true,
1038            can_create_db: true,
1039            can_create_user: true,
1040            can_login: true,
1041            ..Default::default()
1042        });
1043        user_info.write().create_user(UserInfo {
1044            id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1045            name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1046            is_super: true,
1047            can_create_db: true,
1048            can_create_user: true,
1049            can_login: true,
1050            is_admin: true,
1051            ..Default::default()
1052        });
1053        Self {
1054            user_info,
1055            id: AtomicU32::new(NON_RESERVED_USER_ID.as_raw_id()),
1056        }
1057    }
1058
1059    fn gen_id(&self) -> u32 {
1060        self.id.fetch_add(1, Ordering::SeqCst)
1061    }
1062}
1063
1064pub struct MockFrontendMetaClient {}
1065
1066#[async_trait::async_trait]
1067impl FrontendMetaClient for MockFrontendMetaClient {
1068    async fn try_unregister(&self) {}
1069
1070    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1071        Ok(INVALID_VERSION_ID)
1072    }
1073
1074    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1075        Ok(vec![])
1076    }
1077
1078    async fn list_table_fragments(
1079        &self,
1080        _table_ids: &[JobId],
1081    ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1082        Ok(HashMap::default())
1083    }
1084
1085    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1086        Ok(vec![])
1087    }
1088
1089    async fn list_fragment_distribution(
1090        &self,
1091        _include_node: bool,
1092    ) -> RpcResult<Vec<FragmentDistribution>> {
1093        Ok(vec![])
1094    }
1095
1096    async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1097        Ok(vec![])
1098    }
1099
1100    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1101        Ok(vec![])
1102    }
1103
1104    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1105        Ok(vec![])
1106    }
1107
1108    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1109        Ok(vec![])
1110    }
1111
1112    async fn list_sink_log_store_tables(
1113        &self,
1114    ) -> RpcResult<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1115        Ok(vec![])
1116    }
1117
1118    async fn set_system_param(
1119        &self,
1120        _param: String,
1121        _value: Option<String>,
1122    ) -> RpcResult<Option<SystemParamsReader>> {
1123        Ok(Some(SystemParams::default().into()))
1124    }
1125
1126    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1127        Ok(Default::default())
1128    }
1129
1130    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1131        Ok("".to_owned())
1132    }
1133
1134    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1135        Ok(vec![])
1136    }
1137
1138    async fn get_tables(
1139        &self,
1140        _table_ids: Vec<crate::catalog::TableId>,
1141        _include_dropped_tables: bool,
1142    ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1143        Ok(HashMap::new())
1144    }
1145
1146    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, HummockVersionId)>> {
1147        unimplemented!()
1148    }
1149
1150    async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1151        unimplemented!()
1152    }
1153
1154    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1155        Ok(HummockVersion::default())
1156    }
1157
1158    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1159        unimplemented!()
1160    }
1161
1162    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1163        unimplemented!()
1164    }
1165
1166    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1167        unimplemented!()
1168    }
1169
1170    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1171        unimplemented!()
1172    }
1173
1174    async fn list_hummock_active_write_limits(
1175        &self,
1176    ) -> RpcResult<HashMap<CompactionGroupId, WriteLimit>> {
1177        unimplemented!()
1178    }
1179
1180    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1181        unimplemented!()
1182    }
1183
1184    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1185        Ok(vec![])
1186    }
1187
1188    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1189        unimplemented!()
1190    }
1191
1192    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1193        Ok(vec![])
1194    }
1195
1196    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1197        unimplemented!()
1198    }
1199
1200    async fn recover(&self) -> RpcResult<()> {
1201        unimplemented!()
1202    }
1203
1204    async fn apply_throttle(
1205        &self,
1206        _throttle_target: PbThrottleTarget,
1207        _throttle_type: risingwave_pb::common::PbThrottleType,
1208        _id: u32,
1209        _rate_limit: Option<u32>,
1210    ) -> RpcResult<()> {
1211        unimplemented!()
1212    }
1213
1214    async fn alter_fragment_parallelism(
1215        &self,
1216        _fragment_ids: Vec<FragmentId>,
1217        _parallelism: Option<PbTableParallelism>,
1218    ) -> RpcResult<()> {
1219        unimplemented!()
1220    }
1221
1222    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1223        Ok(RecoveryStatus::StatusRunning)
1224    }
1225
1226    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1227        Ok(vec![])
1228    }
1229
1230    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1231        Ok(vec![])
1232    }
1233
1234    async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1235        Ok(HashMap::default())
1236    }
1237
1238    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1239        unimplemented!()
1240    }
1241
1242    async fn alter_sink_props(
1243        &self,
1244        _sink_id: SinkId,
1245        _changed_props: BTreeMap<String, String>,
1246        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1247        _connector_conn_ref: Option<ConnectionId>,
1248    ) -> RpcResult<()> {
1249        unimplemented!()
1250    }
1251
1252    async fn alter_iceberg_table_props(
1253        &self,
1254        _table_id: TableId,
1255        _sink_id: SinkId,
1256        _source_id: SourceId,
1257        _changed_props: BTreeMap<String, String>,
1258        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1259        _connector_conn_ref: Option<ConnectionId>,
1260    ) -> RpcResult<()> {
1261        unimplemented!()
1262    }
1263
1264    async fn alter_source_connector_props(
1265        &self,
1266        _source_id: SourceId,
1267        _changed_props: BTreeMap<String, String>,
1268        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1269        _connector_conn_ref: Option<ConnectionId>,
1270    ) -> RpcResult<()> {
1271        unimplemented!()
1272    }
1273
1274    async fn alter_connection_connector_props(
1275        &self,
1276        _connection_id: u32,
1277        _changed_props: BTreeMap<String, String>,
1278        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1279    ) -> RpcResult<()> {
1280        Ok(())
1281    }
1282
1283    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1284        unimplemented!()
1285    }
1286
1287    async fn get_fragment_by_id(
1288        &self,
1289        _fragment_id: FragmentId,
1290    ) -> RpcResult<Option<FragmentDistribution>> {
1291        unimplemented!()
1292    }
1293
1294    async fn get_fragment_vnodes(
1295        &self,
1296        _fragment_id: FragmentId,
1297    ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1298        unimplemented!()
1299    }
1300
1301    async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1302        unimplemented!()
1303    }
1304
1305    fn worker_id(&self) -> WorkerId {
1306        0.into()
1307    }
1308
1309    async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1310        Ok(())
1311    }
1312
1313    async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1314        Ok(1)
1315    }
1316
1317    async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1318        Ok(())
1319    }
1320
1321    async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1322        Ok(RefreshResponse { status: None })
1323    }
1324
1325    fn cluster_id(&self) -> &str {
1326        "test-cluster-uuid"
1327    }
1328
1329    async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1330        unimplemented!()
1331    }
1332}
1333
1334#[cfg(test)]
1335pub static PROTO_FILE_DATA: &str = r#"
1336    syntax = "proto3";
1337    package test;
1338    message TestRecord {
1339      int32 id = 1;
1340      Country country = 3;
1341      int64 zipcode = 4;
1342      float rate = 5;
1343    }
1344    message TestRecordAlterType {
1345        string id = 1;
1346        Country country = 3;
1347        int32 zipcode = 4;
1348        float rate = 5;
1349      }
1350    message TestRecordExt {
1351      int32 id = 1;
1352      Country country = 3;
1353      int64 zipcode = 4;
1354      float rate = 5;
1355      string name = 6;
1356    }
1357    message Country {
1358      string address = 1;
1359      City city = 2;
1360      string zipcode = 3;
1361    }
1362    message City {
1363      string address = 1;
1364      string zipcode = 2;
1365    }"#;
1366
1367/// Returns the file.
1368/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1369pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1370    let in_file = Builder::new()
1371        .prefix("temp")
1372        .suffix(".proto")
1373        .rand_bytes(8)
1374        .tempfile()
1375        .unwrap();
1376
1377    let out_file = Builder::new()
1378        .prefix("temp")
1379        .suffix(".pb")
1380        .rand_bytes(8)
1381        .tempfile()
1382        .unwrap();
1383
1384    let mut file = in_file.as_file();
1385    file.write_all(proto_data.as_ref())
1386        .expect("writing binary to test file");
1387    file.flush().expect("flush temp file failed");
1388    let include_path = in_file
1389        .path()
1390        .parent()
1391        .unwrap()
1392        .to_string_lossy()
1393        .into_owned();
1394    let out_path = out_file.path().to_string_lossy().into_owned();
1395    let in_path = in_file.path().to_string_lossy().into_owned();
1396    let mut compile = std::process::Command::new("protoc");
1397
1398    let out = compile
1399        .arg("--include_imports")
1400        .arg("-I")
1401        .arg(include_path)
1402        .arg(format!("--descriptor_set_out={}", out_path))
1403        .arg(in_path)
1404        .output()
1405        .expect("failed to compile proto");
1406    if !out.status.success() {
1407        panic!("compile proto failed \n output: {:?}", out);
1408    }
1409    out_file
1410}