risingwave_meta/manager/
env.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::sync::atomic::AtomicU32;
19
20use anyhow::Context;
21use risingwave_common::config::{
22    CompactionConfig, DefaultParallelism, ObjectStoreConfig, RpcClientConfig, SessionInitConfig,
23};
24use risingwave_common::system_param::reader::SystemParamsReader;
25use risingwave_common::{bail, system_param};
26use risingwave_meta_model::prelude::Cluster;
27use risingwave_pb::meta::SystemParams;
28use risingwave_rpc_client::{
29    FrontendClientPool, FrontendClientPoolRef, StreamClientPool, StreamClientPoolRef,
30};
31use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
32use sea_orm::EntityTrait;
33
34use crate::MetaResult;
35use crate::barrier::SharedActorInfos;
36use crate::controller::SqlMetaStore;
37use crate::controller::id::{
38    IdGeneratorManager as SqlIdGeneratorManager, IdGeneratorManagerRef as SqlIdGeneratorManagerRef,
39};
40use crate::controller::session_params::{SessionParamsController, SessionParamsControllerRef};
41use crate::controller::system_param::{SystemParamsController, SystemParamsControllerRef};
42use crate::hummock::sequence::SequenceGenerator;
43use crate::manager::event_log::{EventLogManagerRef, start_event_log_manager};
44use crate::manager::{IdleManager, IdleManagerRef, NotificationManager, NotificationManagerRef};
45use crate::model::ClusterId;
46
47/// [`MetaSrvEnv`] is the global environment in Meta service. The instance will be shared by all
48/// kind of managers inside Meta.
49#[derive(Clone)]
50pub struct MetaSrvEnv {
51    /// id generator manager.
52    id_gen_manager_impl: SqlIdGeneratorManagerRef,
53
54    /// system param manager.
55    system_param_manager_impl: SystemParamsControllerRef,
56
57    /// session param manager.
58    session_param_manager_impl: SessionParamsControllerRef,
59
60    /// meta store.
61    meta_store_impl: SqlMetaStore,
62
63    /// notification manager.
64    notification_manager: NotificationManagerRef,
65
66    pub shared_actor_info: SharedActorInfos,
67
68    /// stream client pool memorization.
69    stream_client_pool: StreamClientPoolRef,
70
71    /// rpc client pool for frontend nodes.
72    frontend_client_pool: FrontendClientPoolRef,
73
74    /// idle status manager.
75    idle_manager: IdleManagerRef,
76
77    event_log_manager: EventLogManagerRef,
78
79    /// Unique identifier of the cluster.
80    cluster_id: ClusterId,
81
82    pub hummock_seq: Arc<SequenceGenerator>,
83
84    /// The await-tree registry of the current meta node.
85    await_tree_reg: await_tree::Registry,
86
87    /// options read by all services
88    pub opts: Arc<MetaOpts>,
89
90    actor_id_generator: Arc<AtomicU32>,
91}
92
93/// Options shared by all meta service instances
94#[derive(Clone, serde::Serialize)]
95pub struct MetaOpts {
96    /// Whether to enable the recovery of the cluster. If disabled, the meta service will exit on
97    /// abnormal cases.
98    pub enable_recovery: bool,
99    /// Whether to disable the auto-scaling feature.
100    pub disable_automatic_parallelism_control: bool,
101    /// The number of streaming jobs per scaling operation.
102    pub parallelism_control_batch_size: usize,
103    /// The period of parallelism control trigger.
104    pub parallelism_control_trigger_period_sec: u64,
105    /// The first delay of parallelism control.
106    pub parallelism_control_trigger_first_delay_sec: u64,
107    /// The maximum number of barriers in-flight in the compute nodes.
108    pub in_flight_barrier_nums: usize,
109    /// After specified seconds of idle (no mview or flush), the process will be exited.
110    /// 0 for infinite, process will never be exited due to long idle time.
111    pub max_idle_ms: u64,
112    /// Whether run in compaction detection test mode
113    pub compaction_deterministic_test: bool,
114    /// Default parallelism of units for all streaming jobs.
115    pub default_parallelism: DefaultParallelism,
116
117    /// Interval of invoking a vacuum job, to remove stale metadata from meta store and objects
118    /// from object store.
119    pub vacuum_interval_sec: u64,
120    /// The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of
121    /// meta node.
122    pub vacuum_spin_interval_ms: u64,
123    /// Interval of invoking iceberg garbage collection, to expire old snapshots.
124    pub iceberg_gc_interval_sec: u64,
125    /// Maximum time to wait for an iceberg compaction task report before the lease expires.
126    pub iceberg_compaction_report_timeout_sec: u64,
127    /// Maximum time to reuse cached iceberg compaction schedule config before refreshing it.
128    pub iceberg_compaction_config_refresh_interval_sec: u64,
129    pub time_travel_vacuum_interval_sec: u64,
130    pub time_travel_vacuum_max_version_count: Option<u32>,
131    /// Interval of hummock version checkpoint.
132    pub hummock_version_checkpoint_interval_sec: u64,
133    pub enable_hummock_data_archive: bool,
134    /// Compression algorithm for hummock version checkpoint: "zstd", "lz4", or "none".
135    pub checkpoint_compression_algorithm: risingwave_common::config::CheckpointCompression,
136    /// Chunk size in bytes for reading large checkpoints.
137    pub checkpoint_read_chunk_size: usize,
138    /// Maximum number of concurrent chunk reads for large checkpoints.
139    pub checkpoint_read_max_in_flight_chunks: usize,
140    pub hummock_time_travel_snapshot_interval: u64,
141    pub hummock_time_travel_sst_info_fetch_batch_size: usize,
142    pub hummock_time_travel_sst_info_insert_batch_size: usize,
143    pub hummock_time_travel_epoch_version_insert_batch_size: usize,
144    pub hummock_time_travel_delta_fetch_batch_size: usize,
145    pub hummock_gc_history_insert_batch_size: usize,
146    pub hummock_time_travel_filter_out_objects_batch_size: usize,
147    pub hummock_time_travel_filter_out_objects_v1: bool,
148    pub hummock_time_travel_filter_out_objects_list_version_batch_size: usize,
149    pub hummock_time_travel_filter_out_objects_list_delta_batch_size: usize,
150    /// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint
151    /// attempt is rejected. Greater value reduces object store IO, meanwhile it results in
152    /// more loss of in memory `HummockVersionCheckpoint::stale_objects` state when meta node is
153    /// restarted.
154    pub min_delta_log_num_for_hummock_version_checkpoint: u64,
155    /// Objects within `min_sst_retention_time_sec` won't be deleted by hummock full GC, even they
156    /// are dangling.
157    pub min_sst_retention_time_sec: u64,
158    /// Interval of automatic hummock full GC.
159    pub full_gc_interval_sec: u64,
160    /// Max number of object per full GC job can fetch.
161    pub full_gc_object_limit: u64,
162    /// Duration in seconds to retain garbage collection history data.
163    pub gc_history_retention_time_sec: u64,
164    /// Max number of inflight time travel query.
165    pub max_inflight_time_travel_query: u64,
166    /// Enable sanity check when SSTs are committed
167    pub enable_committed_sst_sanity_check: bool,
168    /// Schedule compaction for all compaction groups with this interval.
169    pub periodic_compaction_interval_sec: u64,
170    /// Interval of reporting the number of nodes in the cluster.
171    pub node_num_monitor_interval_sec: u64,
172    /// Whether to protect the drop table operation with incoming sink.
173    pub protect_drop_table_with_incoming_sink: bool,
174    /// The Prometheus endpoint for Meta Dashboard Service.
175    /// The Dashboard service uses this in the following ways:
176    /// 1. Query Prometheus for relevant metrics to find Stream Graph Bottleneck, and display it.
177    /// 2. Provide cluster diagnostics, at `/api/monitor/diagnose` to troubleshoot cluster.
178    ///    These are just examples which show how the Meta Dashboard Service queries Prometheus.
179    pub prometheus_endpoint: Option<String>,
180
181    /// The additional selector used when querying Prometheus.
182    pub prometheus_selector: Option<String>,
183
184    /// The VPC id of the cluster.
185    pub vpc_id: Option<String>,
186
187    /// A usable security group id to assign to a vpc endpoint
188    pub security_group_id: Option<String>,
189
190    /// Default tag for the endpoint created when creating a privatelink connection.
191    /// Will be appended to the tags specified in the `tags` field in with clause in `create
192    /// connection`.
193    pub privatelink_endpoint_default_tags: Option<Vec<(String, String)>>,
194
195    /// Schedule `space_reclaim_compaction` for all compaction groups with this interval.
196    pub periodic_space_reclaim_compaction_interval_sec: u64,
197
198    /// telemetry enabled in config file or not
199    pub telemetry_enabled: bool,
200    /// Schedule `ttl_reclaim_compaction` for all compaction groups with this interval.
201    pub periodic_ttl_reclaim_compaction_interval_sec: u64,
202
203    /// Schedule `tombstone_reclaim_compaction` for all compaction groups with this interval.
204    pub periodic_tombstone_reclaim_compaction_interval_sec: u64,
205
206    /// Schedule the regular compaction-group split job for all compaction groups with this interval.
207    pub periodic_scheduling_compaction_group_split_interval_sec: u64,
208    /// Whether to enable overlap normalization before the regular merge scheduler.
209    pub enable_compaction_group_normalize: bool,
210    /// Maximum normalize splits in one scheduler round. Must be greater than 0.
211    pub max_normalize_splits_per_round: u64,
212
213    /// Whether config object storage bucket lifecycle to purge stale data.
214    pub do_not_config_object_storage_lifecycle: bool,
215
216    pub partition_vnode_count: u32,
217
218    /// threshold of high write throughput of state-table, unit: B/sec
219    pub table_high_write_throughput_threshold: u64,
220    /// threshold of low write throughput of state-table, unit: B/sec
221    pub table_low_write_throughput_threshold: u64,
222
223    pub compaction_task_max_heartbeat_interval_secs: u64,
224    pub compaction_task_max_progress_interval_secs: u64,
225    pub compaction_task_id_refill_capacity: u32,
226    pub compaction_config: Option<CompactionConfig>,
227
228    /// hybrid compaction group config
229    ///
230    /// `hybrid_partition_vnode_count` determines the granularity of vnodes in the hybrid compaction group for SST alignment.
231    /// When `hybrid_partition_vnode_count` > 0, in hybrid compaction group
232    /// - Tables with high write throughput will be split at vnode granularity
233    /// - Tables with high size tables will be split by table granularity
234    ///   When `hybrid_partition_vnode_count` = 0,no longer be special alignment operations for the hybrid compaction group
235    pub hybrid_partition_node_count: u32,
236
237    pub event_log_enabled: bool,
238    pub event_log_channel_max_size: u32,
239    pub advertise_addr: String,
240    /// The number of traces to be cached in-memory by the tracing collector
241    /// embedded in the meta node.
242    pub cached_traces_num: u32,
243    /// The maximum memory usage in bytes for the tracing collector embedded
244    /// in the meta node.
245    pub cached_traces_memory_limit_bytes: usize,
246
247    /// l0 picker whether to select trivial move task
248    pub enable_trivial_move: bool,
249
250    /// l0 multi level picker whether to check the overlap accuracy between sub levels
251    pub enable_check_task_level_overlap: bool,
252    pub enable_dropped_column_reclaim: bool,
253
254    /// Whether to split the compaction group when the size of the group exceeds the threshold.
255    pub split_group_size_ratio: f64,
256
257    /// The interval in seconds for the refresh scheduler to check and trigger scheduled refreshes.
258    pub refresh_scheduler_interval_sec: u64,
259
260    /// To split the compaction group when the high throughput statistics of the group exceeds the threshold.
261    pub table_stat_high_write_throughput_ratio_for_split: f64,
262
263    /// To merge the compaction group when the low throughput statistics of the group exceeds the threshold.
264    pub table_stat_low_write_throughput_ratio_for_merge: f64,
265
266    /// The window seconds of table throughput statistic history for split compaction group.
267    pub table_stat_throuput_window_seconds_for_split: usize,
268
269    /// The window seconds of table throughput statistic history for merge compaction group.
270    pub table_stat_throuput_window_seconds_for_merge: usize,
271
272    /// The configuration of the object store
273    pub object_store_config: ObjectStoreConfig,
274
275    /// The maximum number of trivial move tasks to be picked in a single loop
276    pub max_trivial_move_task_count_per_loop: usize,
277
278    /// The maximum number of times to probe for `PullTaskEvent`
279    pub max_get_task_probe_times: usize,
280
281    pub compact_task_table_size_partition_threshold_low: u64,
282    pub compact_task_table_size_partition_threshold_high: u64,
283
284    pub periodic_scheduling_compaction_group_merge_interval_sec: u64,
285
286    pub compaction_group_merge_dimension_threshold: f64,
287
288    // The private key for the secret store, used when the secret is stored in the meta.
289    pub secret_store_private_key: Option<Vec<u8>>,
290    /// The path of the temp secret file directory.
291    pub temp_secret_file_dir: String,
292
293    // Cluster limits
294    pub actor_cnt_per_worker_parallelism_hard_limit: usize,
295    pub actor_cnt_per_worker_parallelism_soft_limit: usize,
296
297    pub table_change_log_insert_batch_size: u64,
298    pub table_change_log_delete_batch_size: u64,
299
300    pub license_key_path: Option<PathBuf>,
301
302    pub compute_client_config: RpcClientConfig,
303    pub stream_client_config: RpcClientConfig,
304    pub frontend_client_config: RpcClientConfig,
305    pub redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
306
307    pub cdc_table_split_init_sleep_interval_splits: u64,
308    pub cdc_table_split_init_sleep_duration_millis: u64,
309    pub cdc_table_split_init_insert_batch_size: u64,
310
311    pub enable_legacy_table_migration: bool,
312    pub pause_on_next_bootstrap_offline: bool,
313}
314
315impl MetaOpts {
316    /// Default opts for testing. Some tests need `enable_recovery=true`
317    pub fn test(enable_recovery: bool) -> Self {
318        Self {
319            enable_recovery,
320            disable_automatic_parallelism_control: false,
321            parallelism_control_batch_size: 1,
322            parallelism_control_trigger_period_sec: 10,
323            parallelism_control_trigger_first_delay_sec: 30,
324            in_flight_barrier_nums: 40,
325            max_idle_ms: 0,
326            compaction_deterministic_test: false,
327            default_parallelism: DefaultParallelism::Full,
328            vacuum_interval_sec: 30,
329            time_travel_vacuum_interval_sec: 30,
330            time_travel_vacuum_max_version_count: None,
331            vacuum_spin_interval_ms: 0,
332            iceberg_gc_interval_sec: 3600,
333            iceberg_compaction_report_timeout_sec: 30 * 60,
334            iceberg_compaction_config_refresh_interval_sec: 60,
335            hummock_version_checkpoint_interval_sec: 30,
336            enable_hummock_data_archive: false,
337            checkpoint_compression_algorithm:
338                risingwave_common::config::CheckpointCompression::Zstd,
339            checkpoint_read_chunk_size: 128 * 1024 * 1024,
340            checkpoint_read_max_in_flight_chunks: 4,
341            hummock_time_travel_snapshot_interval: 0,
342            hummock_time_travel_sst_info_fetch_batch_size: 10_000,
343            hummock_time_travel_sst_info_insert_batch_size: 10,
344            hummock_time_travel_epoch_version_insert_batch_size: 1000,
345            hummock_time_travel_delta_fetch_batch_size: 1000,
346            hummock_gc_history_insert_batch_size: 1000,
347            hummock_time_travel_filter_out_objects_batch_size: 1000,
348            hummock_time_travel_filter_out_objects_v1: false,
349            hummock_time_travel_filter_out_objects_list_version_batch_size: 10,
350            hummock_time_travel_filter_out_objects_list_delta_batch_size: 1000,
351            min_delta_log_num_for_hummock_version_checkpoint: 1,
352            min_sst_retention_time_sec: 3600 * 24 * 7,
353            full_gc_interval_sec: 3600 * 24 * 7,
354            full_gc_object_limit: 100_000,
355            gc_history_retention_time_sec: 3600 * 24 * 7,
356            max_inflight_time_travel_query: 1000,
357            enable_committed_sst_sanity_check: false,
358            periodic_compaction_interval_sec: 300,
359            node_num_monitor_interval_sec: 10,
360            protect_drop_table_with_incoming_sink: false,
361            prometheus_endpoint: None,
362            prometheus_selector: None,
363            vpc_id: None,
364            security_group_id: None,
365            privatelink_endpoint_default_tags: None,
366            periodic_space_reclaim_compaction_interval_sec: 60,
367            telemetry_enabled: false,
368            periodic_ttl_reclaim_compaction_interval_sec: 60,
369            periodic_tombstone_reclaim_compaction_interval_sec: 60,
370            periodic_scheduling_compaction_group_split_interval_sec: 60,
371            enable_compaction_group_normalize: false,
372            max_normalize_splits_per_round: 4,
373            compact_task_table_size_partition_threshold_low: 128 * 1024 * 1024,
374            compact_task_table_size_partition_threshold_high: 512 * 1024 * 1024,
375            table_high_write_throughput_threshold: 128 * 1024 * 1024,
376            table_low_write_throughput_threshold: 64 * 1024 * 1024,
377            do_not_config_object_storage_lifecycle: true,
378            partition_vnode_count: 32,
379            compaction_task_max_heartbeat_interval_secs: 0,
380            compaction_task_max_progress_interval_secs: 1,
381            compaction_task_id_refill_capacity: 64,
382            compaction_config: None,
383            hybrid_partition_node_count: 4,
384            event_log_enabled: false,
385            event_log_channel_max_size: 1,
386            advertise_addr: "".to_owned(),
387            cached_traces_num: 1,
388            cached_traces_memory_limit_bytes: usize::MAX,
389            enable_trivial_move: true,
390            enable_check_task_level_overlap: true,
391            enable_dropped_column_reclaim: false,
392            object_store_config: ObjectStoreConfig::default(),
393            max_trivial_move_task_count_per_loop: 256,
394            max_get_task_probe_times: 5,
395            secret_store_private_key: Some(
396                hex::decode("0123456789abcdef0123456789abcdef").unwrap(),
397            ),
398            temp_secret_file_dir: "./secrets".to_owned(),
399            actor_cnt_per_worker_parallelism_hard_limit: usize::MAX,
400            actor_cnt_per_worker_parallelism_soft_limit: usize::MAX,
401            split_group_size_ratio: 0.9,
402            table_stat_high_write_throughput_ratio_for_split: 0.5,
403            table_stat_low_write_throughput_ratio_for_merge: 0.7,
404            table_stat_throuput_window_seconds_for_split: 60,
405            table_stat_throuput_window_seconds_for_merge: 240,
406            periodic_scheduling_compaction_group_merge_interval_sec: 60 * 10,
407            compaction_group_merge_dimension_threshold: 1.2,
408            license_key_path: None,
409            compute_client_config: RpcClientConfig::default(),
410            stream_client_config: RpcClientConfig::default(),
411            frontend_client_config: RpcClientConfig::default(),
412            redact_sql_option_keywords: Arc::new(Default::default()),
413            cdc_table_split_init_sleep_interval_splits: 1000,
414            cdc_table_split_init_sleep_duration_millis: 10,
415            cdc_table_split_init_insert_batch_size: 1000,
416            enable_legacy_table_migration: true,
417            refresh_scheduler_interval_sec: 60,
418            pause_on_next_bootstrap_offline: false,
419            table_change_log_insert_batch_size: 1000,
420            table_change_log_delete_batch_size: 1000,
421        }
422    }
423}
424
425impl MetaSrvEnv {
426    pub async fn new(
427        opts: MetaOpts,
428        mut init_system_params: SystemParams,
429        session_init: SessionInitConfig,
430        meta_store_impl: SqlMetaStore,
431    ) -> MetaResult<Self> {
432        let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
433        let stream_client_pool =
434            Arc::new(StreamClientPool::new(1, opts.stream_client_config.clone())); // typically no need for plural clients
435        let frontend_client_pool = Arc::new(FrontendClientPool::new(
436            1,
437            opts.frontend_client_config.clone(),
438        ));
439        let event_log_manager = Arc::new(start_event_log_manager(
440            opts.event_log_enabled,
441            opts.event_log_channel_max_size,
442        ));
443
444        // When license key path is specified, license key from system parameters can be easily
445        // overwritten. So we simply reject this case.
446        if opts.license_key_path.is_some()
447            && init_system_params.license_key
448                != system_param::default::license_key_opt().map(Into::into)
449        {
450            bail!(
451                "argument `--license-key-path` (or env var `RW_LICENSE_KEY_PATH`) and \
452                 system parameter `license_key` (or env var `RW_LICENSE_KEY`) may not \
453                 be set at the same time"
454            );
455        }
456
457        let cluster_first_launch = meta_store_impl.up().await.context(
458            "Failed to initialize the meta store, \
459            this may happen if there's existing metadata incompatible with the current version of RisingWave, \
460            e.g., downgrading from a newer release or a nightly build to an older one. \
461            For a single-node deployment, you may want to reset all data by deleting the data directory, \
462            typically located at `~/.risingwave`.",
463        )?;
464
465        let notification_manager =
466            Arc::new(NotificationManager::new(meta_store_impl.clone()).await);
467        let cluster_id = Cluster::find()
468            .one(&meta_store_impl.conn)
469            .await?
470            .map(|c| c.cluster_id.to_string().into())
471            .unwrap();
472
473        // For new clusters:
474        // - the name of the object store needs to be prefixed according to the object id.
475        //
476        // For old clusters
477        // - the prefix is ​​not divided for the sake of compatibility.
478        init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch);
479
480        let system_param_controller = Arc::new(
481            SystemParamsController::new(
482                meta_store_impl.clone(),
483                notification_manager.clone(),
484                init_system_params,
485            )
486            .await?,
487        );
488        let session_param_controller = Arc::new(
489            SessionParamsController::new(
490                meta_store_impl.clone(),
491                notification_manager.clone(),
492                session_init,
493            )
494            .await?,
495        );
496        Ok(Self {
497            id_gen_manager_impl: Arc::new(SqlIdGeneratorManager::new(&meta_store_impl.conn).await?),
498            system_param_manager_impl: system_param_controller,
499            session_param_manager_impl: session_param_controller,
500            meta_store_impl: meta_store_impl.clone(),
501            shared_actor_info: SharedActorInfos::new(notification_manager.clone()),
502            notification_manager,
503            stream_client_pool,
504            frontend_client_pool,
505            idle_manager,
506            event_log_manager,
507            cluster_id,
508            hummock_seq: Arc::new(SequenceGenerator::new(meta_store_impl.conn.clone())),
509            opts: opts.into(),
510            // Await trees on the meta node is lightweight, thus always enabled.
511            await_tree_reg: await_tree::Registry::new(Default::default()),
512            actor_id_generator: Arc::new(AtomicU32::new(0)),
513        })
514    }
515
516    pub fn meta_store(&self) -> SqlMetaStore {
517        self.meta_store_impl.clone()
518    }
519
520    pub fn meta_store_ref(&self) -> &SqlMetaStore {
521        &self.meta_store_impl
522    }
523
524    pub fn id_gen_manager(&self) -> &SqlIdGeneratorManagerRef {
525        &self.id_gen_manager_impl
526    }
527
528    pub fn notification_manager_ref(&self) -> NotificationManagerRef {
529        self.notification_manager.clone()
530    }
531
532    pub fn notification_manager(&self) -> &NotificationManager {
533        self.notification_manager.deref()
534    }
535
536    pub fn idle_manager_ref(&self) -> IdleManagerRef {
537        self.idle_manager.clone()
538    }
539
540    pub fn idle_manager(&self) -> &IdleManager {
541        self.idle_manager.deref()
542    }
543
544    pub fn actor_id_generator(&self) -> &AtomicU32 {
545        self.actor_id_generator.deref()
546    }
547
548    pub async fn system_params_reader(&self) -> SystemParamsReader {
549        self.system_param_manager_impl.get_params().await
550    }
551
552    pub fn system_params_manager_impl_ref(&self) -> SystemParamsControllerRef {
553        self.system_param_manager_impl.clone()
554    }
555
556    pub fn session_params_manager_impl_ref(&self) -> SessionParamsControllerRef {
557        self.session_param_manager_impl.clone()
558    }
559
560    pub fn stream_client_pool_ref(&self) -> StreamClientPoolRef {
561        self.stream_client_pool.clone()
562    }
563
564    pub fn stream_client_pool(&self) -> &StreamClientPool {
565        self.stream_client_pool.deref()
566    }
567
568    pub fn frontend_client_pool(&self) -> &FrontendClientPool {
569        self.frontend_client_pool.deref()
570    }
571
572    pub fn cluster_id(&self) -> &ClusterId {
573        &self.cluster_id
574    }
575
576    pub fn event_log_manager_ref(&self) -> EventLogManagerRef {
577        self.event_log_manager.clone()
578    }
579
580    pub fn await_tree_reg(&self) -> &await_tree::Registry {
581        &self.await_tree_reg
582    }
583
584    pub fn shared_actor_infos(&self) -> &SharedActorInfos {
585        &self.shared_actor_info
586    }
587}
588
589#[cfg(any(test, feature = "test"))]
590impl MetaSrvEnv {
591    // Instance for test.
592    pub async fn for_test() -> Self {
593        Self::for_test_opts(MetaOpts::test(false), |_| ()).await
594    }
595
596    pub async fn for_test_opts(
597        opts: MetaOpts,
598        on_test_system_params: impl FnOnce(&mut risingwave_pb::meta::PbSystemParams),
599    ) -> Self {
600        let mut system_params = risingwave_common::system_param::system_params_for_test();
601        on_test_system_params(&mut system_params);
602        Self::new(
603            opts,
604            system_params,
605            Default::default(),
606            SqlMetaStore::for_test().await,
607        )
608        .await
609        .unwrap()
610    }
611}