risingwave_meta/manager/
env.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16use std::path::PathBuf;
17use std::sync::Arc;
18
19use anyhow::Context;
20use risingwave_common::config::{
21    CompactionConfig, DefaultParallelism, ObjectStoreConfig, RpcClientConfig,
22};
23use risingwave_common::session_config::SessionConfig;
24use risingwave_common::system_param::reader::SystemParamsReader;
25use risingwave_common::{bail, system_param};
26use risingwave_meta_model::prelude::Cluster;
27use risingwave_pb::meta::SystemParams;
28use risingwave_rpc_client::{
29    FrontendClientPool, FrontendClientPoolRef, StreamClientPool, StreamClientPoolRef,
30};
31use sea_orm::EntityTrait;
32
33use crate::MetaResult;
34use crate::controller::SqlMetaStore;
35use crate::controller::id::{
36    IdGeneratorManager as SqlIdGeneratorManager, IdGeneratorManagerRef as SqlIdGeneratorManagerRef,
37};
38use crate::controller::session_params::{SessionParamsController, SessionParamsControllerRef};
39use crate::controller::system_param::{SystemParamsController, SystemParamsControllerRef};
40use crate::hummock::sequence::SequenceGenerator;
41use crate::manager::event_log::{EventLogManagerRef, start_event_log_manager};
42use crate::manager::{IdleManager, IdleManagerRef, NotificationManager, NotificationManagerRef};
43use crate::model::ClusterId;
44
45/// [`MetaSrvEnv`] is the global environment in Meta service. The instance will be shared by all
46/// kind of managers inside Meta.
47#[derive(Clone)]
48pub struct MetaSrvEnv {
49    /// id generator manager.
50    id_gen_manager_impl: SqlIdGeneratorManagerRef,
51
52    /// system param manager.
53    system_param_manager_impl: SystemParamsControllerRef,
54
55    /// session param manager.
56    session_param_manager_impl: SessionParamsControllerRef,
57
58    /// meta store.
59    meta_store_impl: SqlMetaStore,
60
61    /// notification manager.
62    notification_manager: NotificationManagerRef,
63
64    /// stream client pool memorization.
65    stream_client_pool: StreamClientPoolRef,
66
67    /// rpc client pool for frontend nodes.
68    frontend_client_pool: FrontendClientPoolRef,
69
70    /// idle status manager.
71    idle_manager: IdleManagerRef,
72
73    event_log_manager: EventLogManagerRef,
74
75    /// Unique identifier of the cluster.
76    cluster_id: ClusterId,
77
78    pub hummock_seq: Arc<SequenceGenerator>,
79
80    /// options read by all services
81    pub opts: Arc<MetaOpts>,
82}
83
84/// Options shared by all meta service instances
85#[derive(Clone, serde::Serialize)]
86pub struct MetaOpts {
87    /// Whether to enable the recovery of the cluster. If disabled, the meta service will exit on
88    /// abnormal cases.
89    pub enable_recovery: bool,
90    /// Whether to disable the auto-scaling feature.
91    pub disable_automatic_parallelism_control: bool,
92    /// The number of streaming jobs per scaling operation.
93    pub parallelism_control_batch_size: usize,
94    /// The period of parallelism control trigger.
95    pub parallelism_control_trigger_period_sec: u64,
96    /// The first delay of parallelism control.
97    pub parallelism_control_trigger_first_delay_sec: u64,
98    /// The maximum number of barriers in-flight in the compute nodes.
99    pub in_flight_barrier_nums: usize,
100    /// After specified seconds of idle (no mview or flush), the process will be exited.
101    /// 0 for infinite, process will never be exited due to long idle time.
102    pub max_idle_ms: u64,
103    /// Whether run in compaction detection test mode
104    pub compaction_deterministic_test: bool,
105    /// Default parallelism of units for all streaming jobs.
106    pub default_parallelism: DefaultParallelism,
107
108    /// Interval of invoking a vacuum job, to remove stale metadata from meta store and objects
109    /// from object store.
110    pub vacuum_interval_sec: u64,
111    /// The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of
112    /// meta node.
113    pub vacuum_spin_interval_ms: u64,
114    pub time_travel_vacuum_interval_sec: u64,
115    /// Interval of hummock version checkpoint.
116    pub hummock_version_checkpoint_interval_sec: u64,
117    pub enable_hummock_data_archive: bool,
118    pub hummock_time_travel_snapshot_interval: u64,
119    pub hummock_time_travel_sst_info_fetch_batch_size: usize,
120    pub hummock_time_travel_sst_info_insert_batch_size: usize,
121    pub hummock_time_travel_epoch_version_insert_batch_size: usize,
122    pub hummock_gc_history_insert_batch_size: usize,
123    pub hummock_time_travel_filter_out_objects_batch_size: usize,
124    /// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint
125    /// attempt is rejected. Greater value reduces object store IO, meanwhile it results in
126    /// more loss of in memory `HummockVersionCheckpoint::stale_objects` state when meta node is
127    /// restarted.
128    pub min_delta_log_num_for_hummock_version_checkpoint: u64,
129    /// Objects within `min_sst_retention_time_sec` won't be deleted by hummock full GC, even they
130    /// are dangling.
131    pub min_sst_retention_time_sec: u64,
132    /// Interval of automatic hummock full GC.
133    pub full_gc_interval_sec: u64,
134    /// Max number of object per full GC job can fetch.
135    pub full_gc_object_limit: u64,
136    /// Duration in seconds to retain garbage collection history data.
137    pub gc_history_retention_time_sec: u64,
138    /// Max number of inflight time travel query.
139    pub max_inflight_time_travel_query: u64,
140    /// Enable sanity check when SSTs are committed
141    pub enable_committed_sst_sanity_check: bool,
142    /// Schedule compaction for all compaction groups with this interval.
143    pub periodic_compaction_interval_sec: u64,
144    /// Interval of reporting the number of nodes in the cluster.
145    pub node_num_monitor_interval_sec: u64,
146
147    /// The Prometheus endpoint for Meta Dashboard Service.
148    /// The Dashboard service uses this in the following ways:
149    /// 1. Query Prometheus for relevant metrics to find Stream Graph Bottleneck, and display it.
150    /// 2. Provide cluster diagnostics, at `/api/monitor/diagnose` to troubleshoot cluster.
151    ///    These are just examples which show how the Meta Dashboard Service queries Prometheus.
152    pub prometheus_endpoint: Option<String>,
153
154    /// The additional selector used when querying Prometheus.
155    pub prometheus_selector: Option<String>,
156
157    /// The VPC id of the cluster.
158    pub vpc_id: Option<String>,
159
160    /// A usable security group id to assign to a vpc endpoint
161    pub security_group_id: Option<String>,
162
163    /// Default tag for the endpoint created when creating a privatelink connection.
164    /// Will be appended to the tags specified in the `tags` field in with clause in `create
165    /// connection`.
166    pub privatelink_endpoint_default_tags: Option<Vec<(String, String)>>,
167
168    /// Schedule `space_reclaim_compaction` for all compaction groups with this interval.
169    pub periodic_space_reclaim_compaction_interval_sec: u64,
170
171    /// telemetry enabled in config file or not
172    pub telemetry_enabled: bool,
173    /// Schedule `ttl_reclaim_compaction` for all compaction groups with this interval.
174    pub periodic_ttl_reclaim_compaction_interval_sec: u64,
175
176    /// Schedule `tombstone_reclaim_compaction` for all compaction groups with this interval.
177    pub periodic_tombstone_reclaim_compaction_interval_sec: u64,
178
179    /// Schedule `periodic_scheduling_compaction_group_split_interval_sec` for all compaction groups with this interval.
180    pub periodic_scheduling_compaction_group_split_interval_sec: u64,
181
182    /// Whether config object storage bucket lifecycle to purge stale data.
183    pub do_not_config_object_storage_lifecycle: bool,
184
185    pub partition_vnode_count: u32,
186
187    /// threshold of high write throughput of state-table, unit: B/sec
188    pub table_high_write_throughput_threshold: u64,
189    /// threshold of low write throughput of state-table, unit: B/sec
190    pub table_low_write_throughput_threshold: u64,
191
192    pub compaction_task_max_heartbeat_interval_secs: u64,
193    pub compaction_task_max_progress_interval_secs: u64,
194    pub compaction_config: Option<CompactionConfig>,
195
196    /// hybird compaction group config
197    ///
198    /// `hybrid_partition_vnode_count` determines the granularity of vnodes in the hybrid compaction group for SST alignment.
199    /// When `hybrid_partition_vnode_count` > 0, in hybrid compaction group
200    /// - Tables with high write throughput will be split at vnode granularity
201    /// - Tables with high size tables will be split by table granularity
202    ///   When `hybrid_partition_vnode_count` = 0,no longer be special alignment operations for the hybird compaction group
203    pub hybrid_partition_node_count: u32,
204
205    pub event_log_enabled: bool,
206    pub event_log_channel_max_size: u32,
207    pub advertise_addr: String,
208    /// The number of traces to be cached in-memory by the tracing collector
209    /// embedded in the meta node.
210    pub cached_traces_num: u32,
211    /// The maximum memory usage in bytes for the tracing collector embedded
212    /// in the meta node.
213    pub cached_traces_memory_limit_bytes: usize,
214
215    /// l0 picker whether to select trivial move task
216    pub enable_trivial_move: bool,
217
218    /// l0 multi level picker whether to check the overlap accuracy between sub levels
219    pub enable_check_task_level_overlap: bool,
220    pub enable_dropped_column_reclaim: bool,
221
222    /// Whether to split the compaction group when the size of the group exceeds the threshold.
223    pub split_group_size_ratio: f64,
224
225    /// To split the compaction group when the high throughput statistics of the group exceeds the threshold.
226    pub table_stat_high_write_throughput_ratio_for_split: f64,
227
228    /// To merge the compaction group when the low throughput statistics of the group exceeds the threshold.
229    pub table_stat_low_write_throughput_ratio_for_merge: f64,
230
231    /// The window seconds of table throughput statistic history for split compaction group.
232    pub table_stat_throuput_window_seconds_for_split: usize,
233
234    /// The window seconds of table throughput statistic history for merge compaction group.
235    pub table_stat_throuput_window_seconds_for_merge: usize,
236
237    /// The configuration of the object store
238    pub object_store_config: ObjectStoreConfig,
239
240    /// The maximum number of trivial move tasks to be picked in a single loop
241    pub max_trivial_move_task_count_per_loop: usize,
242
243    /// The maximum number of times to probe for `PullTaskEvent`
244    pub max_get_task_probe_times: usize,
245
246    pub compact_task_table_size_partition_threshold_low: u64,
247    pub compact_task_table_size_partition_threshold_high: u64,
248
249    pub periodic_scheduling_compaction_group_merge_interval_sec: u64,
250
251    // The private key for the secret store, used when the secret is stored in the meta.
252    pub secret_store_private_key: Option<Vec<u8>>,
253    /// The path of the temp secret file directory.
254    pub temp_secret_file_dir: String,
255
256    // Cluster limits
257    pub actor_cnt_per_worker_parallelism_hard_limit: usize,
258    pub actor_cnt_per_worker_parallelism_soft_limit: usize,
259
260    pub license_key_path: Option<PathBuf>,
261
262    pub compute_client_config: RpcClientConfig,
263    pub stream_client_config: RpcClientConfig,
264    pub frontend_client_config: RpcClientConfig,
265}
266
267impl MetaOpts {
268    /// Default opts for testing. Some tests need `enable_recovery=true`
269    pub fn test(enable_recovery: bool) -> Self {
270        Self {
271            enable_recovery,
272            disable_automatic_parallelism_control: false,
273            parallelism_control_batch_size: 1,
274            parallelism_control_trigger_period_sec: 10,
275            parallelism_control_trigger_first_delay_sec: 30,
276            in_flight_barrier_nums: 40,
277            max_idle_ms: 0,
278            compaction_deterministic_test: false,
279            default_parallelism: DefaultParallelism::Full,
280            vacuum_interval_sec: 30,
281            time_travel_vacuum_interval_sec: 30,
282            vacuum_spin_interval_ms: 0,
283            hummock_version_checkpoint_interval_sec: 30,
284            enable_hummock_data_archive: false,
285            hummock_time_travel_snapshot_interval: 0,
286            hummock_time_travel_sst_info_fetch_batch_size: 10_000,
287            hummock_time_travel_sst_info_insert_batch_size: 10,
288            hummock_time_travel_epoch_version_insert_batch_size: 1000,
289            hummock_gc_history_insert_batch_size: 1000,
290            hummock_time_travel_filter_out_objects_batch_size: 1000,
291            min_delta_log_num_for_hummock_version_checkpoint: 1,
292            min_sst_retention_time_sec: 3600 * 24 * 7,
293            full_gc_interval_sec: 3600 * 24 * 7,
294            full_gc_object_limit: 100_000,
295            gc_history_retention_time_sec: 3600 * 24 * 7,
296            max_inflight_time_travel_query: 1000,
297            enable_committed_sst_sanity_check: false,
298            periodic_compaction_interval_sec: 60,
299            node_num_monitor_interval_sec: 10,
300            prometheus_endpoint: None,
301            prometheus_selector: None,
302            vpc_id: None,
303            security_group_id: None,
304            privatelink_endpoint_default_tags: None,
305            periodic_space_reclaim_compaction_interval_sec: 60,
306            telemetry_enabled: false,
307            periodic_ttl_reclaim_compaction_interval_sec: 60,
308            periodic_tombstone_reclaim_compaction_interval_sec: 60,
309            periodic_scheduling_compaction_group_split_interval_sec: 60,
310            compact_task_table_size_partition_threshold_low: 128 * 1024 * 1024,
311            compact_task_table_size_partition_threshold_high: 512 * 1024 * 1024,
312            table_high_write_throughput_threshold: 128 * 1024 * 1024,
313            table_low_write_throughput_threshold: 64 * 1024 * 1024,
314            do_not_config_object_storage_lifecycle: true,
315            partition_vnode_count: 32,
316            compaction_task_max_heartbeat_interval_secs: 0,
317            compaction_task_max_progress_interval_secs: 1,
318            compaction_config: None,
319            hybrid_partition_node_count: 4,
320            event_log_enabled: false,
321            event_log_channel_max_size: 1,
322            advertise_addr: "".to_owned(),
323            cached_traces_num: 1,
324            cached_traces_memory_limit_bytes: usize::MAX,
325            enable_trivial_move: true,
326            enable_check_task_level_overlap: true,
327            enable_dropped_column_reclaim: false,
328            object_store_config: ObjectStoreConfig::default(),
329            max_trivial_move_task_count_per_loop: 256,
330            max_get_task_probe_times: 5,
331            secret_store_private_key: Some(
332                hex::decode("0123456789abcdef0123456789abcdef").unwrap(),
333            ),
334            temp_secret_file_dir: "./secrets".to_owned(),
335            actor_cnt_per_worker_parallelism_hard_limit: usize::MAX,
336            actor_cnt_per_worker_parallelism_soft_limit: usize::MAX,
337            split_group_size_ratio: 0.9,
338            table_stat_high_write_throughput_ratio_for_split: 0.5,
339            table_stat_low_write_throughput_ratio_for_merge: 0.7,
340            table_stat_throuput_window_seconds_for_split: 60,
341            table_stat_throuput_window_seconds_for_merge: 240,
342            periodic_scheduling_compaction_group_merge_interval_sec: 60 * 10,
343            license_key_path: None,
344            compute_client_config: RpcClientConfig::default(),
345            stream_client_config: RpcClientConfig::default(),
346            frontend_client_config: RpcClientConfig::default(),
347        }
348    }
349}
350
351impl MetaSrvEnv {
352    pub async fn new(
353        opts: MetaOpts,
354        mut init_system_params: SystemParams,
355        init_session_config: SessionConfig,
356        meta_store_impl: SqlMetaStore,
357    ) -> MetaResult<Self> {
358        let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
359        let stream_client_pool =
360            Arc::new(StreamClientPool::new(1, opts.stream_client_config.clone())); // typically no need for plural clients
361        let frontend_client_pool = Arc::new(FrontendClientPool::new(
362            1,
363            opts.frontend_client_config.clone(),
364        ));
365        let event_log_manager = Arc::new(start_event_log_manager(
366            opts.event_log_enabled,
367            opts.event_log_channel_max_size,
368        ));
369
370        // When license key path is specified, license key from system parameters can be easily
371        // overwritten. So we simply reject this case.
372        if opts.license_key_path.is_some()
373            && init_system_params.license_key
374                != system_param::default::license_key_opt().map(Into::into)
375        {
376            bail!(
377                "argument `--license-key-path` (or env var `RW_LICENSE_KEY_PATH`) and \
378                 system parameter `license_key` (or env var `RW_LICENSE_KEY`) may not \
379                 be set at the same time"
380            );
381        }
382
383        let cluster_first_launch = meta_store_impl.up().await.context(
384            "Failed to initialize the meta store, \
385            this may happen if there's existing metadata incompatible with the current version of RisingWave, \
386            e.g., downgrading from a newer release or a nightly build to an older one. \
387            For a single-node deployment, you may want to reset all data by deleting the data directory, \
388            typically located at `~/.risingwave`.",
389        )?;
390
391        let notification_manager =
392            Arc::new(NotificationManager::new(meta_store_impl.clone()).await);
393        let cluster_id = Cluster::find()
394            .one(&meta_store_impl.conn)
395            .await?
396            .map(|c| c.cluster_id.to_string().into())
397            .unwrap();
398
399        // For new clusters:
400        // - the name of the object store needs to be prefixed according to the object id.
401        //
402        // For old clusters
403        // - the prefix is ​​not divided for the sake of compatibility.
404        init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch);
405
406        let system_param_controller = Arc::new(
407            SystemParamsController::new(
408                meta_store_impl.clone(),
409                notification_manager.clone(),
410                init_system_params,
411            )
412            .await?,
413        );
414        let session_param_controller = Arc::new(
415            SessionParamsController::new(
416                meta_store_impl.clone(),
417                notification_manager.clone(),
418                init_session_config,
419            )
420            .await?,
421        );
422        Ok(Self {
423            id_gen_manager_impl: Arc::new(SqlIdGeneratorManager::new(&meta_store_impl.conn).await?),
424            system_param_manager_impl: system_param_controller,
425            session_param_manager_impl: session_param_controller,
426            meta_store_impl: meta_store_impl.clone(),
427            notification_manager,
428            stream_client_pool,
429            frontend_client_pool,
430            idle_manager,
431            event_log_manager,
432            cluster_id,
433            hummock_seq: Arc::new(SequenceGenerator::new(meta_store_impl.conn.clone())),
434            opts: opts.into(),
435        })
436    }
437
438    pub fn meta_store(&self) -> SqlMetaStore {
439        self.meta_store_impl.clone()
440    }
441
442    pub fn meta_store_ref(&self) -> &SqlMetaStore {
443        &self.meta_store_impl
444    }
445
446    pub fn id_gen_manager(&self) -> &SqlIdGeneratorManagerRef {
447        &self.id_gen_manager_impl
448    }
449
450    pub fn notification_manager_ref(&self) -> NotificationManagerRef {
451        self.notification_manager.clone()
452    }
453
454    pub fn notification_manager(&self) -> &NotificationManager {
455        self.notification_manager.deref()
456    }
457
458    pub fn idle_manager_ref(&self) -> IdleManagerRef {
459        self.idle_manager.clone()
460    }
461
462    pub fn idle_manager(&self) -> &IdleManager {
463        self.idle_manager.deref()
464    }
465
466    pub async fn system_params_reader(&self) -> SystemParamsReader {
467        self.system_param_manager_impl.get_params().await
468    }
469
470    pub fn system_params_manager_impl_ref(&self) -> SystemParamsControllerRef {
471        self.system_param_manager_impl.clone()
472    }
473
474    pub fn session_params_manager_impl_ref(&self) -> SessionParamsControllerRef {
475        self.session_param_manager_impl.clone()
476    }
477
478    pub fn stream_client_pool_ref(&self) -> StreamClientPoolRef {
479        self.stream_client_pool.clone()
480    }
481
482    pub fn stream_client_pool(&self) -> &StreamClientPool {
483        self.stream_client_pool.deref()
484    }
485
486    pub fn frontend_client_pool(&self) -> &FrontendClientPool {
487        self.frontend_client_pool.deref()
488    }
489
490    pub fn cluster_id(&self) -> &ClusterId {
491        &self.cluster_id
492    }
493
494    pub fn event_log_manager_ref(&self) -> EventLogManagerRef {
495        self.event_log_manager.clone()
496    }
497}
498
499#[cfg(any(test, feature = "test"))]
500impl MetaSrvEnv {
501    // Instance for test.
502    pub async fn for_test() -> Self {
503        Self::for_test_opts(MetaOpts::test(false)).await
504    }
505
506    pub async fn for_test_opts(opts: MetaOpts) -> Self {
507        Self::new(
508            opts,
509            risingwave_common::system_param::system_params_for_test(),
510            Default::default(),
511            SqlMetaStore::for_test().await,
512        )
513        .await
514        .unwrap()
515    }
516}