risingwave_meta/manager/
env.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16use std::path::PathBuf;
17use std::sync::Arc;
18
19use anyhow::Context;
20use risingwave_common::config::{
21    CompactionConfig, DefaultParallelism, ObjectStoreConfig, RpcClientConfig,
22};
23use risingwave_common::session_config::SessionConfig;
24use risingwave_common::system_param::reader::SystemParamsReader;
25use risingwave_common::{bail, system_param};
26use risingwave_meta_model::prelude::Cluster;
27use risingwave_pb::meta::SystemParams;
28use risingwave_rpc_client::{
29    FrontendClientPool, FrontendClientPoolRef, StreamClientPool, StreamClientPoolRef,
30};
31use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
32use sea_orm::EntityTrait;
33
34use crate::MetaResult;
35use crate::barrier::SharedActorInfos;
36use crate::controller::SqlMetaStore;
37use crate::controller::id::{
38    IdGeneratorManager as SqlIdGeneratorManager, IdGeneratorManagerRef as SqlIdGeneratorManagerRef,
39};
40use crate::controller::session_params::{SessionParamsController, SessionParamsControllerRef};
41use crate::controller::system_param::{SystemParamsController, SystemParamsControllerRef};
42use crate::hummock::sequence::SequenceGenerator;
43use crate::manager::event_log::{EventLogManagerRef, start_event_log_manager};
44use crate::manager::{IdleManager, IdleManagerRef, NotificationManager, NotificationManagerRef};
45use crate::model::ClusterId;
46
47/// [`MetaSrvEnv`] is the global environment in Meta service. The instance will be shared by all
48/// kind of managers inside Meta.
49#[derive(Clone)]
50pub struct MetaSrvEnv {
51    /// id generator manager.
52    id_gen_manager_impl: SqlIdGeneratorManagerRef,
53
54    /// system param manager.
55    system_param_manager_impl: SystemParamsControllerRef,
56
57    /// session param manager.
58    session_param_manager_impl: SessionParamsControllerRef,
59
60    /// meta store.
61    meta_store_impl: SqlMetaStore,
62
63    /// notification manager.
64    notification_manager: NotificationManagerRef,
65
66    shared_actor_info: SharedActorInfos,
67
68    /// stream client pool memorization.
69    stream_client_pool: StreamClientPoolRef,
70
71    /// rpc client pool for frontend nodes.
72    frontend_client_pool: FrontendClientPoolRef,
73
74    /// idle status manager.
75    idle_manager: IdleManagerRef,
76
77    event_log_manager: EventLogManagerRef,
78
79    /// Unique identifier of the cluster.
80    cluster_id: ClusterId,
81
82    pub hummock_seq: Arc<SequenceGenerator>,
83
84    /// The await-tree registry of the current meta node.
85    await_tree_reg: await_tree::Registry,
86
87    /// options read by all services
88    pub opts: Arc<MetaOpts>,
89}
90
91/// Options shared by all meta service instances
92#[derive(Clone, serde::Serialize)]
93pub struct MetaOpts {
94    /// Whether to enable the recovery of the cluster. If disabled, the meta service will exit on
95    /// abnormal cases.
96    pub enable_recovery: bool,
97    /// Whether to disable the auto-scaling feature.
98    pub disable_automatic_parallelism_control: bool,
99    /// The number of streaming jobs per scaling operation.
100    pub parallelism_control_batch_size: usize,
101    /// The period of parallelism control trigger.
102    pub parallelism_control_trigger_period_sec: u64,
103    /// The first delay of parallelism control.
104    pub parallelism_control_trigger_first_delay_sec: u64,
105    /// The maximum number of barriers in-flight in the compute nodes.
106    pub in_flight_barrier_nums: usize,
107    /// After specified seconds of idle (no mview or flush), the process will be exited.
108    /// 0 for infinite, process will never be exited due to long idle time.
109    pub max_idle_ms: u64,
110    /// Whether run in compaction detection test mode
111    pub compaction_deterministic_test: bool,
112    /// Default parallelism of units for all streaming jobs.
113    pub default_parallelism: DefaultParallelism,
114
115    /// Interval of invoking a vacuum job, to remove stale metadata from meta store and objects
116    /// from object store.
117    pub vacuum_interval_sec: u64,
118    /// The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of
119    /// meta node.
120    pub vacuum_spin_interval_ms: u64,
121    pub time_travel_vacuum_interval_sec: u64,
122    /// Interval of hummock version checkpoint.
123    pub hummock_version_checkpoint_interval_sec: u64,
124    pub enable_hummock_data_archive: bool,
125    pub hummock_time_travel_snapshot_interval: u64,
126    pub hummock_time_travel_sst_info_fetch_batch_size: usize,
127    pub hummock_time_travel_sst_info_insert_batch_size: usize,
128    pub hummock_time_travel_epoch_version_insert_batch_size: usize,
129    pub hummock_gc_history_insert_batch_size: usize,
130    pub hummock_time_travel_filter_out_objects_batch_size: usize,
131    pub hummock_time_travel_filter_out_objects_v1: bool,
132    pub hummock_time_travel_filter_out_objects_list_version_batch_size: usize,
133    pub hummock_time_travel_filter_out_objects_list_delta_batch_size: usize,
134    /// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint
135    /// attempt is rejected. Greater value reduces object store IO, meanwhile it results in
136    /// more loss of in memory `HummockVersionCheckpoint::stale_objects` state when meta node is
137    /// restarted.
138    pub min_delta_log_num_for_hummock_version_checkpoint: u64,
139    /// Objects within `min_sst_retention_time_sec` won't be deleted by hummock full GC, even they
140    /// are dangling.
141    pub min_sst_retention_time_sec: u64,
142    /// Interval of automatic hummock full GC.
143    pub full_gc_interval_sec: u64,
144    /// Max number of object per full GC job can fetch.
145    pub full_gc_object_limit: u64,
146    /// Duration in seconds to retain garbage collection history data.
147    pub gc_history_retention_time_sec: u64,
148    /// Max number of inflight time travel query.
149    pub max_inflight_time_travel_query: u64,
150    /// Enable sanity check when SSTs are committed
151    pub enable_committed_sst_sanity_check: bool,
152    /// Schedule compaction for all compaction groups with this interval.
153    pub periodic_compaction_interval_sec: u64,
154    /// Interval of reporting the number of nodes in the cluster.
155    pub node_num_monitor_interval_sec: u64,
156    /// Whether to protect the drop table operation with incoming sink.
157    pub protect_drop_table_with_incoming_sink: bool,
158    /// The Prometheus endpoint for Meta Dashboard Service.
159    /// The Dashboard service uses this in the following ways:
160    /// 1. Query Prometheus for relevant metrics to find Stream Graph Bottleneck, and display it.
161    /// 2. Provide cluster diagnostics, at `/api/monitor/diagnose` to troubleshoot cluster.
162    ///    These are just examples which show how the Meta Dashboard Service queries Prometheus.
163    pub prometheus_endpoint: Option<String>,
164
165    /// The additional selector used when querying Prometheus.
166    pub prometheus_selector: Option<String>,
167
168    /// The VPC id of the cluster.
169    pub vpc_id: Option<String>,
170
171    /// A usable security group id to assign to a vpc endpoint
172    pub security_group_id: Option<String>,
173
174    /// Default tag for the endpoint created when creating a privatelink connection.
175    /// Will be appended to the tags specified in the `tags` field in with clause in `create
176    /// connection`.
177    pub privatelink_endpoint_default_tags: Option<Vec<(String, String)>>,
178
179    /// Schedule `space_reclaim_compaction` for all compaction groups with this interval.
180    pub periodic_space_reclaim_compaction_interval_sec: u64,
181
182    /// telemetry enabled in config file or not
183    pub telemetry_enabled: bool,
184    /// Schedule `ttl_reclaim_compaction` for all compaction groups with this interval.
185    pub periodic_ttl_reclaim_compaction_interval_sec: u64,
186
187    /// Schedule `tombstone_reclaim_compaction` for all compaction groups with this interval.
188    pub periodic_tombstone_reclaim_compaction_interval_sec: u64,
189
190    /// Schedule `periodic_scheduling_compaction_group_split_interval_sec` for all compaction groups with this interval.
191    pub periodic_scheduling_compaction_group_split_interval_sec: u64,
192
193    /// Whether config object storage bucket lifecycle to purge stale data.
194    pub do_not_config_object_storage_lifecycle: bool,
195
196    pub partition_vnode_count: u32,
197
198    /// threshold of high write throughput of state-table, unit: B/sec
199    pub table_high_write_throughput_threshold: u64,
200    /// threshold of low write throughput of state-table, unit: B/sec
201    pub table_low_write_throughput_threshold: u64,
202
203    pub compaction_task_max_heartbeat_interval_secs: u64,
204    pub compaction_task_max_progress_interval_secs: u64,
205    pub compaction_config: Option<CompactionConfig>,
206
207    /// hybird compaction group config
208    ///
209    /// `hybrid_partition_vnode_count` determines the granularity of vnodes in the hybrid compaction group for SST alignment.
210    /// When `hybrid_partition_vnode_count` > 0, in hybrid compaction group
211    /// - Tables with high write throughput will be split at vnode granularity
212    /// - Tables with high size tables will be split by table granularity
213    ///   When `hybrid_partition_vnode_count` = 0,no longer be special alignment operations for the hybird compaction group
214    pub hybrid_partition_node_count: u32,
215
216    pub event_log_enabled: bool,
217    pub event_log_channel_max_size: u32,
218    pub advertise_addr: String,
219    /// The number of traces to be cached in-memory by the tracing collector
220    /// embedded in the meta node.
221    pub cached_traces_num: u32,
222    /// The maximum memory usage in bytes for the tracing collector embedded
223    /// in the meta node.
224    pub cached_traces_memory_limit_bytes: usize,
225
226    /// l0 picker whether to select trivial move task
227    pub enable_trivial_move: bool,
228
229    /// l0 multi level picker whether to check the overlap accuracy between sub levels
230    pub enable_check_task_level_overlap: bool,
231    pub enable_dropped_column_reclaim: bool,
232
233    /// Whether to split the compaction group when the size of the group exceeds the threshold.
234    pub split_group_size_ratio: f64,
235
236    /// To split the compaction group when the high throughput statistics of the group exceeds the threshold.
237    pub table_stat_high_write_throughput_ratio_for_split: f64,
238
239    /// To merge the compaction group when the low throughput statistics of the group exceeds the threshold.
240    pub table_stat_low_write_throughput_ratio_for_merge: f64,
241
242    /// The window seconds of table throughput statistic history for split compaction group.
243    pub table_stat_throuput_window_seconds_for_split: usize,
244
245    /// The window seconds of table throughput statistic history for merge compaction group.
246    pub table_stat_throuput_window_seconds_for_merge: usize,
247
248    /// The configuration of the object store
249    pub object_store_config: ObjectStoreConfig,
250
251    /// The maximum number of trivial move tasks to be picked in a single loop
252    pub max_trivial_move_task_count_per_loop: usize,
253
254    /// The maximum number of times to probe for `PullTaskEvent`
255    pub max_get_task_probe_times: usize,
256
257    pub compact_task_table_size_partition_threshold_low: u64,
258    pub compact_task_table_size_partition_threshold_high: u64,
259
260    pub periodic_scheduling_compaction_group_merge_interval_sec: u64,
261
262    pub compaction_group_merge_dimension_threshold: f64,
263
264    // The private key for the secret store, used when the secret is stored in the meta.
265    pub secret_store_private_key: Option<Vec<u8>>,
266    /// The path of the temp secret file directory.
267    pub temp_secret_file_dir: String,
268
269    // Cluster limits
270    pub actor_cnt_per_worker_parallelism_hard_limit: usize,
271    pub actor_cnt_per_worker_parallelism_soft_limit: usize,
272
273    pub license_key_path: Option<PathBuf>,
274
275    pub compute_client_config: RpcClientConfig,
276    pub stream_client_config: RpcClientConfig,
277    pub frontend_client_config: RpcClientConfig,
278    pub redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
279
280    pub cdc_table_split_init_sleep_interval_splits: u64,
281    pub cdc_table_split_init_sleep_duration_millis: u64,
282    pub cdc_table_split_init_insert_batch_size: u64,
283}
284
285impl MetaOpts {
286    /// Default opts for testing. Some tests need `enable_recovery=true`
287    pub fn test(enable_recovery: bool) -> Self {
288        Self {
289            enable_recovery,
290            disable_automatic_parallelism_control: false,
291            parallelism_control_batch_size: 1,
292            parallelism_control_trigger_period_sec: 10,
293            parallelism_control_trigger_first_delay_sec: 30,
294            in_flight_barrier_nums: 40,
295            max_idle_ms: 0,
296            compaction_deterministic_test: false,
297            default_parallelism: DefaultParallelism::Full,
298            vacuum_interval_sec: 30,
299            time_travel_vacuum_interval_sec: 30,
300            vacuum_spin_interval_ms: 0,
301            hummock_version_checkpoint_interval_sec: 30,
302            enable_hummock_data_archive: false,
303            hummock_time_travel_snapshot_interval: 0,
304            hummock_time_travel_sst_info_fetch_batch_size: 10_000,
305            hummock_time_travel_sst_info_insert_batch_size: 10,
306            hummock_time_travel_epoch_version_insert_batch_size: 1000,
307            hummock_gc_history_insert_batch_size: 1000,
308            hummock_time_travel_filter_out_objects_batch_size: 1000,
309            hummock_time_travel_filter_out_objects_v1: false,
310            hummock_time_travel_filter_out_objects_list_version_batch_size: 10,
311            hummock_time_travel_filter_out_objects_list_delta_batch_size: 1000,
312            min_delta_log_num_for_hummock_version_checkpoint: 1,
313            min_sst_retention_time_sec: 3600 * 24 * 7,
314            full_gc_interval_sec: 3600 * 24 * 7,
315            full_gc_object_limit: 100_000,
316            gc_history_retention_time_sec: 3600 * 24 * 7,
317            max_inflight_time_travel_query: 1000,
318            enable_committed_sst_sanity_check: false,
319            periodic_compaction_interval_sec: 60,
320            node_num_monitor_interval_sec: 10,
321            protect_drop_table_with_incoming_sink: false,
322            prometheus_endpoint: None,
323            prometheus_selector: None,
324            vpc_id: None,
325            security_group_id: None,
326            privatelink_endpoint_default_tags: None,
327            periodic_space_reclaim_compaction_interval_sec: 60,
328            telemetry_enabled: false,
329            periodic_ttl_reclaim_compaction_interval_sec: 60,
330            periodic_tombstone_reclaim_compaction_interval_sec: 60,
331            periodic_scheduling_compaction_group_split_interval_sec: 60,
332            compact_task_table_size_partition_threshold_low: 128 * 1024 * 1024,
333            compact_task_table_size_partition_threshold_high: 512 * 1024 * 1024,
334            table_high_write_throughput_threshold: 128 * 1024 * 1024,
335            table_low_write_throughput_threshold: 64 * 1024 * 1024,
336            do_not_config_object_storage_lifecycle: true,
337            partition_vnode_count: 32,
338            compaction_task_max_heartbeat_interval_secs: 0,
339            compaction_task_max_progress_interval_secs: 1,
340            compaction_config: None,
341            hybrid_partition_node_count: 4,
342            event_log_enabled: false,
343            event_log_channel_max_size: 1,
344            advertise_addr: "".to_owned(),
345            cached_traces_num: 1,
346            cached_traces_memory_limit_bytes: usize::MAX,
347            enable_trivial_move: true,
348            enable_check_task_level_overlap: true,
349            enable_dropped_column_reclaim: false,
350            object_store_config: ObjectStoreConfig::default(),
351            max_trivial_move_task_count_per_loop: 256,
352            max_get_task_probe_times: 5,
353            secret_store_private_key: Some(
354                hex::decode("0123456789abcdef0123456789abcdef").unwrap(),
355            ),
356            temp_secret_file_dir: "./secrets".to_owned(),
357            actor_cnt_per_worker_parallelism_hard_limit: usize::MAX,
358            actor_cnt_per_worker_parallelism_soft_limit: usize::MAX,
359            split_group_size_ratio: 0.9,
360            table_stat_high_write_throughput_ratio_for_split: 0.5,
361            table_stat_low_write_throughput_ratio_for_merge: 0.7,
362            table_stat_throuput_window_seconds_for_split: 60,
363            table_stat_throuput_window_seconds_for_merge: 240,
364            periodic_scheduling_compaction_group_merge_interval_sec: 60 * 10,
365            compaction_group_merge_dimension_threshold: 1.2,
366            license_key_path: None,
367            compute_client_config: RpcClientConfig::default(),
368            stream_client_config: RpcClientConfig::default(),
369            frontend_client_config: RpcClientConfig::default(),
370            redact_sql_option_keywords: Arc::new(Default::default()),
371            cdc_table_split_init_sleep_interval_splits: 1000,
372            cdc_table_split_init_sleep_duration_millis: 10,
373            cdc_table_split_init_insert_batch_size: 1000,
374        }
375    }
376}
377
378impl MetaSrvEnv {
379    pub async fn new(
380        opts: MetaOpts,
381        mut init_system_params: SystemParams,
382        init_session_config: SessionConfig,
383        meta_store_impl: SqlMetaStore,
384    ) -> MetaResult<Self> {
385        let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
386        let stream_client_pool =
387            Arc::new(StreamClientPool::new(1, opts.stream_client_config.clone())); // typically no need for plural clients
388        let frontend_client_pool = Arc::new(FrontendClientPool::new(
389            1,
390            opts.frontend_client_config.clone(),
391        ));
392        let event_log_manager = Arc::new(start_event_log_manager(
393            opts.event_log_enabled,
394            opts.event_log_channel_max_size,
395        ));
396
397        // When license key path is specified, license key from system parameters can be easily
398        // overwritten. So we simply reject this case.
399        if opts.license_key_path.is_some()
400            && init_system_params.license_key
401                != system_param::default::license_key_opt().map(Into::into)
402        {
403            bail!(
404                "argument `--license-key-path` (or env var `RW_LICENSE_KEY_PATH`) and \
405                 system parameter `license_key` (or env var `RW_LICENSE_KEY`) may not \
406                 be set at the same time"
407            );
408        }
409
410        let cluster_first_launch = meta_store_impl.up().await.context(
411            "Failed to initialize the meta store, \
412            this may happen if there's existing metadata incompatible with the current version of RisingWave, \
413            e.g., downgrading from a newer release or a nightly build to an older one. \
414            For a single-node deployment, you may want to reset all data by deleting the data directory, \
415            typically located at `~/.risingwave`.",
416        )?;
417
418        let notification_manager =
419            Arc::new(NotificationManager::new(meta_store_impl.clone()).await);
420        let cluster_id = Cluster::find()
421            .one(&meta_store_impl.conn)
422            .await?
423            .map(|c| c.cluster_id.to_string().into())
424            .unwrap();
425
426        // For new clusters:
427        // - the name of the object store needs to be prefixed according to the object id.
428        //
429        // For old clusters
430        // - the prefix is ​​not divided for the sake of compatibility.
431        init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch);
432
433        let system_param_controller = Arc::new(
434            SystemParamsController::new(
435                meta_store_impl.clone(),
436                notification_manager.clone(),
437                init_system_params,
438            )
439            .await?,
440        );
441        let session_param_controller = Arc::new(
442            SessionParamsController::new(
443                meta_store_impl.clone(),
444                notification_manager.clone(),
445                init_session_config,
446            )
447            .await?,
448        );
449        Ok(Self {
450            id_gen_manager_impl: Arc::new(SqlIdGeneratorManager::new(&meta_store_impl.conn).await?),
451            system_param_manager_impl: system_param_controller,
452            session_param_manager_impl: session_param_controller,
453            meta_store_impl: meta_store_impl.clone(),
454            shared_actor_info: SharedActorInfos::new(notification_manager.clone()),
455            notification_manager,
456            stream_client_pool,
457            frontend_client_pool,
458            idle_manager,
459            event_log_manager,
460            cluster_id,
461            hummock_seq: Arc::new(SequenceGenerator::new(meta_store_impl.conn.clone())),
462            opts: opts.into(),
463            // Await trees on the meta node is lightweight, thus always enabled.
464            await_tree_reg: await_tree::Registry::new(Default::default()),
465        })
466    }
467
468    pub fn meta_store(&self) -> SqlMetaStore {
469        self.meta_store_impl.clone()
470    }
471
472    pub fn meta_store_ref(&self) -> &SqlMetaStore {
473        &self.meta_store_impl
474    }
475
476    pub fn id_gen_manager(&self) -> &SqlIdGeneratorManagerRef {
477        &self.id_gen_manager_impl
478    }
479
480    pub fn notification_manager_ref(&self) -> NotificationManagerRef {
481        self.notification_manager.clone()
482    }
483
484    pub fn notification_manager(&self) -> &NotificationManager {
485        self.notification_manager.deref()
486    }
487
488    pub fn idle_manager_ref(&self) -> IdleManagerRef {
489        self.idle_manager.clone()
490    }
491
492    pub fn idle_manager(&self) -> &IdleManager {
493        self.idle_manager.deref()
494    }
495
496    pub async fn system_params_reader(&self) -> SystemParamsReader {
497        self.system_param_manager_impl.get_params().await
498    }
499
500    pub fn system_params_manager_impl_ref(&self) -> SystemParamsControllerRef {
501        self.system_param_manager_impl.clone()
502    }
503
504    pub fn session_params_manager_impl_ref(&self) -> SessionParamsControllerRef {
505        self.session_param_manager_impl.clone()
506    }
507
508    pub fn stream_client_pool_ref(&self) -> StreamClientPoolRef {
509        self.stream_client_pool.clone()
510    }
511
512    pub fn stream_client_pool(&self) -> &StreamClientPool {
513        self.stream_client_pool.deref()
514    }
515
516    pub fn frontend_client_pool(&self) -> &FrontendClientPool {
517        self.frontend_client_pool.deref()
518    }
519
520    pub fn cluster_id(&self) -> &ClusterId {
521        &self.cluster_id
522    }
523
524    pub fn event_log_manager_ref(&self) -> EventLogManagerRef {
525        self.event_log_manager.clone()
526    }
527
528    pub fn await_tree_reg(&self) -> &await_tree::Registry {
529        &self.await_tree_reg
530    }
531
532    pub(crate) fn shared_actor_infos(&self) -> &SharedActorInfos {
533        &self.shared_actor_info
534    }
535}
536
537#[cfg(any(test, feature = "test"))]
538impl MetaSrvEnv {
539    // Instance for test.
540    pub async fn for_test() -> Self {
541        Self::for_test_opts(MetaOpts::test(false), |_| ()).await
542    }
543
544    pub async fn for_test_opts(
545        opts: MetaOpts,
546        on_test_system_params: impl FnOnce(&mut risingwave_pb::meta::PbSystemParams),
547    ) -> Self {
548        let mut system_params = risingwave_common::system_param::system_params_for_test();
549        on_test_system_params(&mut system_params);
550        Self::new(
551            opts,
552            system_params,
553            Default::default(),
554            SqlMetaStore::for_test().await,
555        )
556        .await
557        .unwrap()
558    }
559}