risingwave_meta/manager/
env.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::sync::atomic::AtomicU32;
19
20use anyhow::Context;
21use risingwave_common::config::{
22    CompactionConfig, DefaultParallelism, ObjectStoreConfig, RpcClientConfig,
23};
24use risingwave_common::session_config::SessionConfig;
25use risingwave_common::system_param::reader::SystemParamsReader;
26use risingwave_common::{bail, system_param};
27use risingwave_meta_model::prelude::Cluster;
28use risingwave_pb::meta::SystemParams;
29use risingwave_rpc_client::{
30    FrontendClientPool, FrontendClientPoolRef, StreamClientPool, StreamClientPoolRef,
31};
32use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
33use sea_orm::EntityTrait;
34
35use crate::MetaResult;
36use crate::barrier::SharedActorInfos;
37use crate::controller::SqlMetaStore;
38use crate::controller::id::{
39    IdGeneratorManager as SqlIdGeneratorManager, IdGeneratorManagerRef as SqlIdGeneratorManagerRef,
40};
41use crate::controller::session_params::{SessionParamsController, SessionParamsControllerRef};
42use crate::controller::system_param::{SystemParamsController, SystemParamsControllerRef};
43use crate::hummock::sequence::SequenceGenerator;
44use crate::manager::event_log::{EventLogManagerRef, start_event_log_manager};
45use crate::manager::{IdleManager, IdleManagerRef, NotificationManager, NotificationManagerRef};
46use crate::model::ClusterId;
47
48/// [`MetaSrvEnv`] is the global environment in Meta service. The instance will be shared by all
49/// kind of managers inside Meta.
50#[derive(Clone)]
51pub struct MetaSrvEnv {
52    /// id generator manager.
53    id_gen_manager_impl: SqlIdGeneratorManagerRef,
54
55    /// system param manager.
56    system_param_manager_impl: SystemParamsControllerRef,
57
58    /// session param manager.
59    session_param_manager_impl: SessionParamsControllerRef,
60
61    /// meta store.
62    meta_store_impl: SqlMetaStore,
63
64    /// notification manager.
65    notification_manager: NotificationManagerRef,
66
67    pub shared_actor_info: SharedActorInfos,
68
69    /// stream client pool memorization.
70    stream_client_pool: StreamClientPoolRef,
71
72    /// rpc client pool for frontend nodes.
73    frontend_client_pool: FrontendClientPoolRef,
74
75    /// idle status manager.
76    idle_manager: IdleManagerRef,
77
78    event_log_manager: EventLogManagerRef,
79
80    /// Unique identifier of the cluster.
81    cluster_id: ClusterId,
82
83    pub hummock_seq: Arc<SequenceGenerator>,
84
85    /// The await-tree registry of the current meta node.
86    await_tree_reg: await_tree::Registry,
87
88    /// options read by all services
89    pub opts: Arc<MetaOpts>,
90
91    actor_id_generator: Arc<AtomicU32>,
92}
93
94/// Options shared by all meta service instances
95#[derive(Clone, serde::Serialize)]
96pub struct MetaOpts {
97    /// Whether to enable the recovery of the cluster. If disabled, the meta service will exit on
98    /// abnormal cases.
99    pub enable_recovery: bool,
100    /// Whether to disable the auto-scaling feature.
101    pub disable_automatic_parallelism_control: bool,
102    /// The number of streaming jobs per scaling operation.
103    pub parallelism_control_batch_size: usize,
104    /// The period of parallelism control trigger.
105    pub parallelism_control_trigger_period_sec: u64,
106    /// The first delay of parallelism control.
107    pub parallelism_control_trigger_first_delay_sec: u64,
108    /// The maximum number of barriers in-flight in the compute nodes.
109    pub in_flight_barrier_nums: usize,
110    /// After specified seconds of idle (no mview or flush), the process will be exited.
111    /// 0 for infinite, process will never be exited due to long idle time.
112    pub max_idle_ms: u64,
113    /// Whether run in compaction detection test mode
114    pub compaction_deterministic_test: bool,
115    /// Default parallelism of units for all streaming jobs.
116    pub default_parallelism: DefaultParallelism,
117
118    /// Interval of invoking a vacuum job, to remove stale metadata from meta store and objects
119    /// from object store.
120    pub vacuum_interval_sec: u64,
121    /// The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of
122    /// meta node.
123    pub vacuum_spin_interval_ms: u64,
124    /// Interval of invoking iceberg garbage collection, to expire old snapshots.
125    pub iceberg_gc_interval_sec: u64,
126    /// Maximum time to wait for an iceberg compaction task report before the lease expires.
127    pub iceberg_compaction_report_timeout_sec: u64,
128    /// Maximum time to reuse cached iceberg compaction schedule config before refreshing it.
129    pub iceberg_compaction_config_refresh_interval_sec: u64,
130    pub time_travel_vacuum_interval_sec: u64,
131    pub time_travel_vacuum_max_version_count: Option<u32>,
132    /// Interval of hummock version checkpoint.
133    pub hummock_version_checkpoint_interval_sec: u64,
134    pub enable_hummock_data_archive: bool,
135    /// Compression algorithm for hummock version checkpoint: "zstd", "lz4", or "none".
136    pub checkpoint_compression_algorithm: risingwave_common::config::CheckpointCompression,
137    /// Chunk size in bytes for reading large checkpoints.
138    pub checkpoint_read_chunk_size: usize,
139    /// Maximum number of concurrent chunk reads for large checkpoints.
140    pub checkpoint_read_max_in_flight_chunks: usize,
141    pub hummock_time_travel_snapshot_interval: u64,
142    pub hummock_time_travel_sst_info_fetch_batch_size: usize,
143    pub hummock_time_travel_sst_info_insert_batch_size: usize,
144    pub hummock_time_travel_epoch_version_insert_batch_size: usize,
145    pub hummock_time_travel_delta_fetch_batch_size: usize,
146    pub hummock_gc_history_insert_batch_size: usize,
147    pub hummock_time_travel_filter_out_objects_batch_size: usize,
148    pub hummock_time_travel_filter_out_objects_v1: bool,
149    pub hummock_time_travel_filter_out_objects_list_version_batch_size: usize,
150    pub hummock_time_travel_filter_out_objects_list_delta_batch_size: usize,
151    /// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint
152    /// attempt is rejected. Greater value reduces object store IO, meanwhile it results in
153    /// more loss of in memory `HummockVersionCheckpoint::stale_objects` state when meta node is
154    /// restarted.
155    pub min_delta_log_num_for_hummock_version_checkpoint: u64,
156    /// Objects within `min_sst_retention_time_sec` won't be deleted by hummock full GC, even they
157    /// are dangling.
158    pub min_sst_retention_time_sec: u64,
159    /// Interval of automatic hummock full GC.
160    pub full_gc_interval_sec: u64,
161    /// Max number of object per full GC job can fetch.
162    pub full_gc_object_limit: u64,
163    /// Duration in seconds to retain garbage collection history data.
164    pub gc_history_retention_time_sec: u64,
165    /// Max number of inflight time travel query.
166    pub max_inflight_time_travel_query: u64,
167    /// Enable sanity check when SSTs are committed
168    pub enable_committed_sst_sanity_check: bool,
169    /// Schedule compaction for all compaction groups with this interval.
170    pub periodic_compaction_interval_sec: u64,
171    /// Interval of reporting the number of nodes in the cluster.
172    pub node_num_monitor_interval_sec: u64,
173    /// Whether to protect the drop table operation with incoming sink.
174    pub protect_drop_table_with_incoming_sink: bool,
175    /// The Prometheus endpoint for Meta Dashboard Service.
176    /// The Dashboard service uses this in the following ways:
177    /// 1. Query Prometheus for relevant metrics to find Stream Graph Bottleneck, and display it.
178    /// 2. Provide cluster diagnostics, at `/api/monitor/diagnose` to troubleshoot cluster.
179    ///    These are just examples which show how the Meta Dashboard Service queries Prometheus.
180    pub prometheus_endpoint: Option<String>,
181
182    /// The additional selector used when querying Prometheus.
183    pub prometheus_selector: Option<String>,
184
185    /// The VPC id of the cluster.
186    pub vpc_id: Option<String>,
187
188    /// A usable security group id to assign to a vpc endpoint
189    pub security_group_id: Option<String>,
190
191    /// Default tag for the endpoint created when creating a privatelink connection.
192    /// Will be appended to the tags specified in the `tags` field in with clause in `create
193    /// connection`.
194    pub privatelink_endpoint_default_tags: Option<Vec<(String, String)>>,
195
196    /// Schedule `space_reclaim_compaction` for all compaction groups with this interval.
197    pub periodic_space_reclaim_compaction_interval_sec: u64,
198
199    /// telemetry enabled in config file or not
200    pub telemetry_enabled: bool,
201    /// Schedule `ttl_reclaim_compaction` for all compaction groups with this interval.
202    pub periodic_ttl_reclaim_compaction_interval_sec: u64,
203
204    /// Schedule `tombstone_reclaim_compaction` for all compaction groups with this interval.
205    pub periodic_tombstone_reclaim_compaction_interval_sec: u64,
206
207    /// Schedule the regular compaction-group split job for all compaction groups with this interval.
208    pub periodic_scheduling_compaction_group_split_interval_sec: u64,
209    /// Whether to enable overlap normalization before the regular merge scheduler.
210    pub enable_compaction_group_normalize: bool,
211    /// Maximum normalize splits in one scheduler round. Must be greater than 0.
212    pub max_normalize_splits_per_round: u64,
213
214    /// Whether config object storage bucket lifecycle to purge stale data.
215    pub do_not_config_object_storage_lifecycle: bool,
216
217    pub partition_vnode_count: u32,
218
219    /// threshold of high write throughput of state-table, unit: B/sec
220    pub table_high_write_throughput_threshold: u64,
221    /// threshold of low write throughput of state-table, unit: B/sec
222    pub table_low_write_throughput_threshold: u64,
223
224    pub compaction_task_max_heartbeat_interval_secs: u64,
225    pub compaction_task_max_progress_interval_secs: u64,
226    pub compaction_task_id_refill_capacity: u32,
227    pub compaction_config: Option<CompactionConfig>,
228
229    /// hybrid compaction group config
230    ///
231    /// `hybrid_partition_vnode_count` determines the granularity of vnodes in the hybrid compaction group for SST alignment.
232    /// When `hybrid_partition_vnode_count` > 0, in hybrid compaction group
233    /// - Tables with high write throughput will be split at vnode granularity
234    /// - Tables with high size tables will be split by table granularity
235    ///   When `hybrid_partition_vnode_count` = 0,no longer be special alignment operations for the hybrid compaction group
236    pub hybrid_partition_node_count: u32,
237
238    pub event_log_enabled: bool,
239    pub event_log_channel_max_size: u32,
240    pub advertise_addr: String,
241    /// The number of traces to be cached in-memory by the tracing collector
242    /// embedded in the meta node.
243    pub cached_traces_num: u32,
244    /// The maximum memory usage in bytes for the tracing collector embedded
245    /// in the meta node.
246    pub cached_traces_memory_limit_bytes: usize,
247
248    /// l0 picker whether to select trivial move task
249    pub enable_trivial_move: bool,
250
251    /// l0 multi level picker whether to check the overlap accuracy between sub levels
252    pub enable_check_task_level_overlap: bool,
253    pub enable_dropped_column_reclaim: bool,
254
255    /// Whether to split the compaction group when the size of the group exceeds the threshold.
256    pub split_group_size_ratio: f64,
257
258    /// The interval in seconds for the refresh scheduler to check and trigger scheduled refreshes.
259    pub refresh_scheduler_interval_sec: u64,
260
261    /// To split the compaction group when the high throughput statistics of the group exceeds the threshold.
262    pub table_stat_high_write_throughput_ratio_for_split: f64,
263
264    /// To merge the compaction group when the low throughput statistics of the group exceeds the threshold.
265    pub table_stat_low_write_throughput_ratio_for_merge: f64,
266
267    /// The window seconds of table throughput statistic history for split compaction group.
268    pub table_stat_throuput_window_seconds_for_split: usize,
269
270    /// The window seconds of table throughput statistic history for merge compaction group.
271    pub table_stat_throuput_window_seconds_for_merge: usize,
272
273    /// The configuration of the object store
274    pub object_store_config: ObjectStoreConfig,
275
276    /// The maximum number of trivial move tasks to be picked in a single loop
277    pub max_trivial_move_task_count_per_loop: usize,
278
279    /// The maximum number of times to probe for `PullTaskEvent`
280    pub max_get_task_probe_times: usize,
281
282    pub compact_task_table_size_partition_threshold_low: u64,
283    pub compact_task_table_size_partition_threshold_high: u64,
284
285    pub periodic_scheduling_compaction_group_merge_interval_sec: u64,
286
287    pub compaction_group_merge_dimension_threshold: f64,
288
289    // The private key for the secret store, used when the secret is stored in the meta.
290    pub secret_store_private_key: Option<Vec<u8>>,
291    /// The path of the temp secret file directory.
292    pub temp_secret_file_dir: String,
293
294    // Cluster limits
295    pub actor_cnt_per_worker_parallelism_hard_limit: usize,
296    pub actor_cnt_per_worker_parallelism_soft_limit: usize,
297
298    pub table_change_log_insert_batch_size: u64,
299    pub table_change_log_delete_batch_size: u64,
300
301    pub license_key_path: Option<PathBuf>,
302
303    pub compute_client_config: RpcClientConfig,
304    pub stream_client_config: RpcClientConfig,
305    pub frontend_client_config: RpcClientConfig,
306    pub redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
307
308    pub cdc_table_split_init_sleep_interval_splits: u64,
309    pub cdc_table_split_init_sleep_duration_millis: u64,
310    pub cdc_table_split_init_insert_batch_size: u64,
311
312    pub enable_legacy_table_migration: bool,
313    pub pause_on_next_bootstrap_offline: bool,
314}
315
316impl MetaOpts {
317    /// Default opts for testing. Some tests need `enable_recovery=true`
318    pub fn test(enable_recovery: bool) -> Self {
319        Self {
320            enable_recovery,
321            disable_automatic_parallelism_control: false,
322            parallelism_control_batch_size: 1,
323            parallelism_control_trigger_period_sec: 10,
324            parallelism_control_trigger_first_delay_sec: 30,
325            in_flight_barrier_nums: 40,
326            max_idle_ms: 0,
327            compaction_deterministic_test: false,
328            default_parallelism: DefaultParallelism::Full,
329            vacuum_interval_sec: 30,
330            time_travel_vacuum_interval_sec: 30,
331            time_travel_vacuum_max_version_count: None,
332            vacuum_spin_interval_ms: 0,
333            iceberg_gc_interval_sec: 3600,
334            iceberg_compaction_report_timeout_sec: 30 * 60,
335            iceberg_compaction_config_refresh_interval_sec: 60,
336            hummock_version_checkpoint_interval_sec: 30,
337            enable_hummock_data_archive: false,
338            checkpoint_compression_algorithm:
339                risingwave_common::config::CheckpointCompression::Zstd,
340            checkpoint_read_chunk_size: 128 * 1024 * 1024,
341            checkpoint_read_max_in_flight_chunks: 4,
342            hummock_time_travel_snapshot_interval: 0,
343            hummock_time_travel_sst_info_fetch_batch_size: 10_000,
344            hummock_time_travel_sst_info_insert_batch_size: 10,
345            hummock_time_travel_epoch_version_insert_batch_size: 1000,
346            hummock_time_travel_delta_fetch_batch_size: 1000,
347            hummock_gc_history_insert_batch_size: 1000,
348            hummock_time_travel_filter_out_objects_batch_size: 1000,
349            hummock_time_travel_filter_out_objects_v1: false,
350            hummock_time_travel_filter_out_objects_list_version_batch_size: 10,
351            hummock_time_travel_filter_out_objects_list_delta_batch_size: 1000,
352            min_delta_log_num_for_hummock_version_checkpoint: 1,
353            min_sst_retention_time_sec: 3600 * 24 * 7,
354            full_gc_interval_sec: 3600 * 24 * 7,
355            full_gc_object_limit: 100_000,
356            gc_history_retention_time_sec: 3600 * 24 * 7,
357            max_inflight_time_travel_query: 1000,
358            enable_committed_sst_sanity_check: false,
359            periodic_compaction_interval_sec: 300,
360            node_num_monitor_interval_sec: 10,
361            protect_drop_table_with_incoming_sink: false,
362            prometheus_endpoint: None,
363            prometheus_selector: None,
364            vpc_id: None,
365            security_group_id: None,
366            privatelink_endpoint_default_tags: None,
367            periodic_space_reclaim_compaction_interval_sec: 60,
368            telemetry_enabled: false,
369            periodic_ttl_reclaim_compaction_interval_sec: 60,
370            periodic_tombstone_reclaim_compaction_interval_sec: 60,
371            periodic_scheduling_compaction_group_split_interval_sec: 60,
372            enable_compaction_group_normalize: false,
373            max_normalize_splits_per_round: 4,
374            compact_task_table_size_partition_threshold_low: 128 * 1024 * 1024,
375            compact_task_table_size_partition_threshold_high: 512 * 1024 * 1024,
376            table_high_write_throughput_threshold: 128 * 1024 * 1024,
377            table_low_write_throughput_threshold: 64 * 1024 * 1024,
378            do_not_config_object_storage_lifecycle: true,
379            partition_vnode_count: 32,
380            compaction_task_max_heartbeat_interval_secs: 0,
381            compaction_task_max_progress_interval_secs: 1,
382            compaction_task_id_refill_capacity: 64,
383            compaction_config: None,
384            hybrid_partition_node_count: 4,
385            event_log_enabled: false,
386            event_log_channel_max_size: 1,
387            advertise_addr: "".to_owned(),
388            cached_traces_num: 1,
389            cached_traces_memory_limit_bytes: usize::MAX,
390            enable_trivial_move: true,
391            enable_check_task_level_overlap: true,
392            enable_dropped_column_reclaim: false,
393            object_store_config: ObjectStoreConfig::default(),
394            max_trivial_move_task_count_per_loop: 256,
395            max_get_task_probe_times: 5,
396            secret_store_private_key: Some(
397                hex::decode("0123456789abcdef0123456789abcdef").unwrap(),
398            ),
399            temp_secret_file_dir: "./secrets".to_owned(),
400            actor_cnt_per_worker_parallelism_hard_limit: usize::MAX,
401            actor_cnt_per_worker_parallelism_soft_limit: usize::MAX,
402            split_group_size_ratio: 0.9,
403            table_stat_high_write_throughput_ratio_for_split: 0.5,
404            table_stat_low_write_throughput_ratio_for_merge: 0.7,
405            table_stat_throuput_window_seconds_for_split: 60,
406            table_stat_throuput_window_seconds_for_merge: 240,
407            periodic_scheduling_compaction_group_merge_interval_sec: 60 * 10,
408            compaction_group_merge_dimension_threshold: 1.2,
409            license_key_path: None,
410            compute_client_config: RpcClientConfig::default(),
411            stream_client_config: RpcClientConfig::default(),
412            frontend_client_config: RpcClientConfig::default(),
413            redact_sql_option_keywords: Arc::new(Default::default()),
414            cdc_table_split_init_sleep_interval_splits: 1000,
415            cdc_table_split_init_sleep_duration_millis: 10,
416            cdc_table_split_init_insert_batch_size: 1000,
417            enable_legacy_table_migration: true,
418            refresh_scheduler_interval_sec: 60,
419            pause_on_next_bootstrap_offline: false,
420            table_change_log_insert_batch_size: 1000,
421            table_change_log_delete_batch_size: 1000,
422        }
423    }
424}
425
426impl MetaSrvEnv {
427    pub async fn new(
428        opts: MetaOpts,
429        mut init_system_params: SystemParams,
430        init_session_config: SessionConfig,
431        meta_store_impl: SqlMetaStore,
432    ) -> MetaResult<Self> {
433        let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
434        let stream_client_pool =
435            Arc::new(StreamClientPool::new(1, opts.stream_client_config.clone())); // typically no need for plural clients
436        let frontend_client_pool = Arc::new(FrontendClientPool::new(
437            1,
438            opts.frontend_client_config.clone(),
439        ));
440        let event_log_manager = Arc::new(start_event_log_manager(
441            opts.event_log_enabled,
442            opts.event_log_channel_max_size,
443        ));
444
445        // When license key path is specified, license key from system parameters can be easily
446        // overwritten. So we simply reject this case.
447        if opts.license_key_path.is_some()
448            && init_system_params.license_key
449                != system_param::default::license_key_opt().map(Into::into)
450        {
451            bail!(
452                "argument `--license-key-path` (or env var `RW_LICENSE_KEY_PATH`) and \
453                 system parameter `license_key` (or env var `RW_LICENSE_KEY`) may not \
454                 be set at the same time"
455            );
456        }
457
458        let cluster_first_launch = meta_store_impl.up().await.context(
459            "Failed to initialize the meta store, \
460            this may happen if there's existing metadata incompatible with the current version of RisingWave, \
461            e.g., downgrading from a newer release or a nightly build to an older one. \
462            For a single-node deployment, you may want to reset all data by deleting the data directory, \
463            typically located at `~/.risingwave`.",
464        )?;
465
466        let notification_manager =
467            Arc::new(NotificationManager::new(meta_store_impl.clone()).await);
468        let cluster_id = Cluster::find()
469            .one(&meta_store_impl.conn)
470            .await?
471            .map(|c| c.cluster_id.to_string().into())
472            .unwrap();
473
474        // For new clusters:
475        // - the name of the object store needs to be prefixed according to the object id.
476        //
477        // For old clusters
478        // - the prefix is ​​not divided for the sake of compatibility.
479        init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch);
480
481        let system_param_controller = Arc::new(
482            SystemParamsController::new(
483                meta_store_impl.clone(),
484                notification_manager.clone(),
485                init_system_params,
486            )
487            .await?,
488        );
489        let session_param_controller = Arc::new(
490            SessionParamsController::new(
491                meta_store_impl.clone(),
492                notification_manager.clone(),
493                init_session_config,
494            )
495            .await?,
496        );
497        Ok(Self {
498            id_gen_manager_impl: Arc::new(SqlIdGeneratorManager::new(&meta_store_impl.conn).await?),
499            system_param_manager_impl: system_param_controller,
500            session_param_manager_impl: session_param_controller,
501            meta_store_impl: meta_store_impl.clone(),
502            shared_actor_info: SharedActorInfos::new(notification_manager.clone()),
503            notification_manager,
504            stream_client_pool,
505            frontend_client_pool,
506            idle_manager,
507            event_log_manager,
508            cluster_id,
509            hummock_seq: Arc::new(SequenceGenerator::new(meta_store_impl.conn.clone())),
510            opts: opts.into(),
511            // Await trees on the meta node is lightweight, thus always enabled.
512            await_tree_reg: await_tree::Registry::new(Default::default()),
513            actor_id_generator: Arc::new(AtomicU32::new(0)),
514        })
515    }
516
517    pub fn meta_store(&self) -> SqlMetaStore {
518        self.meta_store_impl.clone()
519    }
520
521    pub fn meta_store_ref(&self) -> &SqlMetaStore {
522        &self.meta_store_impl
523    }
524
525    pub fn id_gen_manager(&self) -> &SqlIdGeneratorManagerRef {
526        &self.id_gen_manager_impl
527    }
528
529    pub fn notification_manager_ref(&self) -> NotificationManagerRef {
530        self.notification_manager.clone()
531    }
532
533    pub fn notification_manager(&self) -> &NotificationManager {
534        self.notification_manager.deref()
535    }
536
537    pub fn idle_manager_ref(&self) -> IdleManagerRef {
538        self.idle_manager.clone()
539    }
540
541    pub fn idle_manager(&self) -> &IdleManager {
542        self.idle_manager.deref()
543    }
544
545    pub fn actor_id_generator(&self) -> &AtomicU32 {
546        self.actor_id_generator.deref()
547    }
548
549    pub async fn system_params_reader(&self) -> SystemParamsReader {
550        self.system_param_manager_impl.get_params().await
551    }
552
553    pub fn system_params_manager_impl_ref(&self) -> SystemParamsControllerRef {
554        self.system_param_manager_impl.clone()
555    }
556
557    pub fn session_params_manager_impl_ref(&self) -> SessionParamsControllerRef {
558        self.session_param_manager_impl.clone()
559    }
560
561    pub fn stream_client_pool_ref(&self) -> StreamClientPoolRef {
562        self.stream_client_pool.clone()
563    }
564
565    pub fn stream_client_pool(&self) -> &StreamClientPool {
566        self.stream_client_pool.deref()
567    }
568
569    pub fn frontend_client_pool(&self) -> &FrontendClientPool {
570        self.frontend_client_pool.deref()
571    }
572
573    pub fn cluster_id(&self) -> &ClusterId {
574        &self.cluster_id
575    }
576
577    pub fn event_log_manager_ref(&self) -> EventLogManagerRef {
578        self.event_log_manager.clone()
579    }
580
581    pub fn await_tree_reg(&self) -> &await_tree::Registry {
582        &self.await_tree_reg
583    }
584
585    pub fn shared_actor_infos(&self) -> &SharedActorInfos {
586        &self.shared_actor_info
587    }
588}
589
590#[cfg(any(test, feature = "test"))]
591impl MetaSrvEnv {
592    // Instance for test.
593    pub async fn for_test() -> Self {
594        Self::for_test_opts(MetaOpts::test(false), |_| ()).await
595    }
596
597    pub async fn for_test_opts(
598        opts: MetaOpts,
599        on_test_system_params: impl FnOnce(&mut risingwave_pb::meta::PbSystemParams),
600    ) -> Self {
601        let mut system_params = risingwave_common::system_param::system_params_for_test();
602        on_test_system_params(&mut system_params);
603        Self::new(
604            opts,
605            system_params,
606            Default::default(),
607            SqlMetaStore::for_test().await,
608        )
609        .await
610        .unwrap()
611    }
612}