risingwave_meta_node/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(let_chains)]
16#![cfg_attr(coverage, feature(coverage_attribute))]
17
18mod server;
19
20use std::path::PathBuf;
21use std::time::Duration;
22
23use clap::Parser;
24use educe::Educe;
25pub use error::{MetaError, MetaResult};
26use redact::Secret;
27use risingwave_common::config::OverrideConfig;
28use risingwave_common::license::LicenseKey;
29use risingwave_common::util::meta_addr::MetaAddressStrategy;
30use risingwave_common::util::resource_util;
31use risingwave_common::util::tokio_util::sync::CancellationToken;
32use risingwave_common::{GIT_SHA, RW_VERSION};
33use risingwave_common_heap_profiling::HeapProfiler;
34use risingwave_meta::*;
35use risingwave_meta_service::*;
36pub use rpc::{ElectionClient, ElectionMember};
37use server::rpc_serve;
38pub use server::started::get as is_server_started;
39
40use crate::manager::MetaOpts;
41
42#[derive(Educe, Clone, Parser, OverrideConfig)]
43#[educe(Debug)]
44#[command(version, about = "The central metadata management service")]
45pub struct MetaNodeOpts {
46    // TODO: use `SocketAddr`
47    #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5690")]
48    pub listen_addr: String,
49
50    /// The address for contacting this instance of the service.
51    /// This would be synonymous with the service's "public address"
52    /// or "identifying address".
53    /// It will serve as a unique identifier in cluster
54    /// membership and leader election. Must be specified for meta backend.
55    #[clap(long, env = "RW_ADVERTISE_ADDR", default_value = "127.0.0.1:5690")]
56    pub advertise_addr: String,
57
58    #[clap(long, env = "RW_DASHBOARD_HOST")]
59    pub dashboard_host: Option<String>,
60
61    /// We will start a http server at this address via `MetricsManager`.
62    /// Then the prometheus instance will poll the metrics from this address.
63    #[clap(long, env = "RW_PROMETHEUS_HOST", alias = "prometheus-host")]
64    pub prometheus_listener_addr: Option<String>,
65
66    /// Endpoint of the SQL service, make it non-option when SQL service is required.
67    #[clap(long, hide = true, env = "RW_SQL_ENDPOINT")]
68    pub sql_endpoint: Option<Secret<String>>,
69
70    /// Username of sql backend, required when meta backend set to MySQL or PostgreSQL.
71    #[clap(long, hide = true, env = "RW_SQL_USERNAME", default_value = "")]
72    pub sql_username: String,
73
74    /// Password of sql backend, required when meta backend set to MySQL or PostgreSQL.
75    #[clap(long, hide = true, env = "RW_SQL_PASSWORD", default_value = "")]
76    pub sql_password: Secret<String>,
77
78    /// Database of sql backend, required when meta backend set to MySQL or PostgreSQL.
79    #[clap(long, hide = true, env = "RW_SQL_DATABASE", default_value = "")]
80    pub sql_database: String,
81
82    /// The HTTP REST-API address of the Prometheus instance associated to this cluster.
83    /// This address is used to serve `PromQL` queries to Prometheus.
84    /// It is also used by Grafana Dashboard Service to fetch metrics and visualize them.
85    #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")]
86    pub prometheus_endpoint: Option<String>,
87
88    /// The additional selector used when querying Prometheus.
89    ///
90    /// The format is same as `PromQL`. Example: `instance="foo",namespace="bar"`
91    #[clap(long, env = "RW_PROMETHEUS_SELECTOR")]
92    pub prometheus_selector: Option<String>,
93
94    /// Default tag for the endpoint created when creating a privatelink connection.
95    /// Will be appended to the tags specified in the `tags` field in with clause in `create
96    /// connection`.
97    #[clap(long, hide = true, env = "RW_PRIVATELINK_ENDPOINT_DEFAULT_TAGS")]
98    pub privatelink_endpoint_default_tags: Option<String>,
99
100    #[clap(long, hide = true, env = "RW_VPC_ID")]
101    pub vpc_id: Option<String>,
102
103    #[clap(long, hide = true, env = "RW_VPC_SECURITY_GROUP_ID")]
104    pub security_group_id: Option<String>,
105
106    /// The path of `risingwave.toml` configuration file.
107    ///
108    /// If empty, default configuration values will be used.
109    #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
110    pub config_path: String,
111
112    #[clap(long, hide = true, env = "RW_BACKEND", value_enum)]
113    #[override_opts(path = meta.backend)]
114    pub backend: Option<MetaBackend>,
115
116    /// The interval of periodic barrier.
117    #[clap(long, hide = true, env = "RW_BARRIER_INTERVAL_MS")]
118    #[override_opts(path = system.barrier_interval_ms)]
119    pub barrier_interval_ms: Option<u32>,
120
121    /// Target size of the Sstable.
122    #[clap(long, hide = true, env = "RW_SSTABLE_SIZE_MB")]
123    #[override_opts(path = system.sstable_size_mb)]
124    pub sstable_size_mb: Option<u32>,
125
126    /// Size of each block in bytes in SST.
127    #[clap(long, hide = true, env = "RW_BLOCK_SIZE_KB")]
128    #[override_opts(path = system.block_size_kb)]
129    pub block_size_kb: Option<u32>,
130
131    /// False positive probability of bloom filter.
132    #[clap(long, hide = true, env = "RW_BLOOM_FALSE_POSITIVE")]
133    #[override_opts(path = system.bloom_false_positive)]
134    pub bloom_false_positive: Option<f64>,
135
136    /// State store url
137    #[clap(long, hide = true, env = "RW_STATE_STORE")]
138    #[override_opts(path = system.state_store)]
139    pub state_store: Option<String>,
140
141    /// Remote directory for storing data and metadata objects.
142    #[clap(long, hide = true, env = "RW_DATA_DIRECTORY")]
143    #[override_opts(path = system.data_directory)]
144    pub data_directory: Option<String>,
145
146    /// Whether config object storage bucket lifecycle to purge stale data.
147    #[clap(long, hide = true, env = "RW_DO_NOT_CONFIG_BUCKET_LIFECYCLE")]
148    #[override_opts(path = meta.do_not_config_object_storage_lifecycle)]
149    pub do_not_config_object_storage_lifecycle: Option<bool>,
150
151    /// Remote storage url for storing snapshots.
152    #[clap(long, hide = true, env = "RW_BACKUP_STORAGE_URL")]
153    #[override_opts(path = system.backup_storage_url)]
154    pub backup_storage_url: Option<String>,
155
156    /// Remote directory for storing snapshots.
157    #[clap(long, hide = true, env = "RW_BACKUP_STORAGE_DIRECTORY")]
158    #[override_opts(path = system.backup_storage_directory)]
159    pub backup_storage_directory: Option<String>,
160
161    /// Enable heap profile dump when memory usage is high.
162    #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
163    #[override_opts(path = server.heap_profiling.dir)]
164    pub heap_profiling_dir: Option<String>,
165
166    /// Exit if idle for a certain period of time.
167    #[clap(long, hide = true, env = "RW_DANGEROUS_MAX_IDLE_SECS")]
168    #[override_opts(path = meta.dangerous_max_idle_secs)]
169    pub dangerous_max_idle_secs: Option<u64>,
170
171    /// Endpoint of the connector node.
172    #[deprecated = "connector node has been deprecated."]
173    #[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")]
174    pub connector_rpc_endpoint: Option<String>,
175
176    /// The license key to activate enterprise features.
177    #[clap(long, hide = true, env = "RW_LICENSE_KEY")]
178    #[override_opts(path = system.license_key)]
179    pub license_key: Option<LicenseKey>,
180
181    /// The path of the license key file to be watched and hot-reloaded.
182    #[clap(long, env = "RW_LICENSE_KEY_PATH")]
183    pub license_key_path: Option<PathBuf>,
184
185    /// 128-bit AES key for secret store in HEX format.
186    #[educe(Debug(ignore))] // TODO: use newtype to redact debug impl
187    #[clap(long, hide = true, env = "RW_SECRET_STORE_PRIVATE_KEY_HEX")]
188    pub secret_store_private_key_hex: Option<String>,
189
190    /// The path of the temp secret file directory.
191    #[clap(
192        long,
193        hide = true,
194        env = "RW_TEMP_SECRET_FILE_DIR",
195        default_value = "./secrets"
196    )]
197    pub temp_secret_file_dir: String,
198}
199
200impl risingwave_common::opts::Opts for MetaNodeOpts {
201    fn name() -> &'static str {
202        "meta"
203    }
204
205    fn meta_addr(&self) -> MetaAddressStrategy {
206        format!("http://{}", self.listen_addr)
207            .parse()
208            .expect("invalid listen address")
209    }
210}
211
212use std::future::Future;
213use std::pin::Pin;
214
215use risingwave_common::config::{MetaBackend, RwConfig, load_config};
216use tracing::info;
217
218/// Start meta node
219pub fn start(
220    opts: MetaNodeOpts,
221    shutdown: CancellationToken,
222) -> Pin<Box<dyn Future<Output = ()> + Send>> {
223    // WARNING: don't change the function signature. Making it `async fn` will cause
224    // slow compile in release mode.
225    Box::pin(async move {
226        info!("Starting meta node");
227        info!("> options: {:?}", opts);
228        let config = load_config(&opts.config_path, &opts);
229        info!("> config: {:?}", config);
230        info!("> version: {} ({})", RW_VERSION, GIT_SHA);
231        let listen_addr = opts.listen_addr.parse().unwrap();
232        let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap());
233        let prometheus_addr = opts.prometheus_listener_addr.map(|x| x.parse().unwrap());
234        let meta_store_config = config.meta.meta_store_config.clone();
235        let backend = match config.meta.backend {
236            MetaBackend::Mem => MetaStoreBackend::Mem,
237            MetaBackend::Sql => MetaStoreBackend::Sql {
238                endpoint: opts
239                    .sql_endpoint
240                    .expect("sql endpoint is required")
241                    .expose_secret()
242                    .to_string(),
243                config: meta_store_config,
244            },
245            MetaBackend::Sqlite => MetaStoreBackend::Sql {
246                endpoint: format!(
247                    "sqlite://{}?mode=rwc",
248                    opts.sql_endpoint
249                        .expect("sql endpoint is required")
250                        .expose_secret()
251                ),
252                config: meta_store_config,
253            },
254            MetaBackend::Postgres => MetaStoreBackend::Sql {
255                endpoint: format!(
256                    "postgres://{}:{}@{}/{}",
257                    opts.sql_username,
258                    opts.sql_password.expose_secret(),
259                    opts.sql_endpoint
260                        .expect("sql endpoint is required")
261                        .expose_secret(),
262                    opts.sql_database
263                ),
264                config: meta_store_config,
265            },
266            MetaBackend::Mysql => MetaStoreBackend::Sql {
267                endpoint: format!(
268                    "mysql://{}:{}@{}/{}",
269                    opts.sql_username,
270                    opts.sql_password.expose_secret(),
271                    opts.sql_endpoint
272                        .expect("sql endpoint is required")
273                        .expose_secret(),
274                    opts.sql_database
275                ),
276                config: meta_store_config,
277            },
278        };
279        validate_config(&config);
280
281        let total_memory_bytes = resource_util::memory::system_memory_available_bytes();
282        let heap_profiler =
283            HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
284        // Run a background heap profiler
285        heap_profiler.start();
286
287        let secret_store_private_key = opts
288            .secret_store_private_key_hex
289            .map(|key| hex::decode(key).unwrap());
290        let max_heartbeat_interval =
291            Duration::from_secs(config.meta.max_heartbeat_interval_secs as u64);
292        let max_idle_ms = config.meta.dangerous_max_idle_secs.unwrap_or(0) * 1000;
293        let in_flight_barrier_nums = config.streaming.in_flight_barrier_nums;
294        let privatelink_endpoint_default_tags =
295            opts.privatelink_endpoint_default_tags.map(|tags| {
296                tags.split(',')
297                    .map(|s| {
298                        let key_val = s.split_once('=').unwrap();
299                        (key_val.0.to_owned(), key_val.1.to_owned())
300                    })
301                    .collect()
302            });
303
304        let add_info = AddressInfo {
305            advertise_addr: opts.advertise_addr.to_owned(),
306            listen_addr,
307            prometheus_addr,
308            dashboard_addr,
309        };
310
311        const MIN_TIMEOUT_INTERVAL_SEC: u64 = 20;
312        let compaction_task_max_progress_interval_secs = {
313            let retry_config = &config.storage.object_store.retry;
314            let max_streming_read_timeout_ms = (retry_config.streaming_read_attempt_timeout_ms
315                + retry_config.req_backoff_max_delay_ms)
316                * retry_config.streaming_read_retry_attempts as u64;
317            let max_streaming_upload_timeout_ms = (retry_config
318                .streaming_upload_attempt_timeout_ms
319                + retry_config.req_backoff_max_delay_ms)
320                * retry_config.streaming_upload_retry_attempts as u64;
321            let max_upload_timeout_ms = (retry_config.upload_attempt_timeout_ms
322                + retry_config.req_backoff_max_delay_ms)
323                * retry_config.upload_retry_attempts as u64;
324            let max_read_timeout_ms = (retry_config.read_attempt_timeout_ms
325                + retry_config.req_backoff_max_delay_ms)
326                * retry_config.read_retry_attempts as u64;
327            let max_timeout_ms = max_streming_read_timeout_ms
328                .max(max_upload_timeout_ms)
329                .max(max_streaming_upload_timeout_ms)
330                .max(max_read_timeout_ms)
331                .max(config.meta.compaction_task_max_progress_interval_secs * 1000);
332            max_timeout_ms / 1000
333        } + MIN_TIMEOUT_INTERVAL_SEC;
334
335        rpc_serve(
336            add_info,
337            backend,
338            max_heartbeat_interval,
339            config.meta.meta_leader_lease_secs,
340            MetaOpts {
341                enable_recovery: !config.meta.disable_recovery,
342                disable_automatic_parallelism_control: config
343                    .meta
344                    .disable_automatic_parallelism_control,
345                parallelism_control_batch_size: config.meta.parallelism_control_batch_size,
346                parallelism_control_trigger_period_sec: config
347                    .meta
348                    .parallelism_control_trigger_period_sec,
349                parallelism_control_trigger_first_delay_sec: config
350                    .meta
351                    .parallelism_control_trigger_first_delay_sec,
352                in_flight_barrier_nums,
353                max_idle_ms,
354                compaction_deterministic_test: config.meta.enable_compaction_deterministic,
355                default_parallelism: config.meta.default_parallelism,
356                vacuum_interval_sec: config.meta.vacuum_interval_sec,
357                time_travel_vacuum_interval_sec: config
358                    .meta
359                    .developer
360                    .time_travel_vacuum_interval_sec,
361                vacuum_spin_interval_ms: config.meta.vacuum_spin_interval_ms,
362                hummock_version_checkpoint_interval_sec: config
363                    .meta
364                    .hummock_version_checkpoint_interval_sec,
365                enable_hummock_data_archive: config.meta.enable_hummock_data_archive,
366                hummock_time_travel_snapshot_interval: config
367                    .meta
368                    .hummock_time_travel_snapshot_interval,
369                hummock_time_travel_sst_info_fetch_batch_size: config
370                    .meta
371                    .developer
372                    .hummock_time_travel_sst_info_fetch_batch_size,
373                hummock_time_travel_sst_info_insert_batch_size: config
374                    .meta
375                    .developer
376                    .hummock_time_travel_sst_info_insert_batch_size,
377                hummock_time_travel_epoch_version_insert_batch_size: config
378                    .meta
379                    .developer
380                    .hummock_time_travel_epoch_version_insert_batch_size,
381                hummock_gc_history_insert_batch_size: config
382                    .meta
383                    .developer
384                    .hummock_gc_history_insert_batch_size,
385                hummock_time_travel_filter_out_objects_batch_size: config
386                    .meta
387                    .developer
388                    .hummock_time_travel_filter_out_objects_batch_size,
389                min_delta_log_num_for_hummock_version_checkpoint: config
390                    .meta
391                    .min_delta_log_num_for_hummock_version_checkpoint,
392                min_sst_retention_time_sec: config.meta.min_sst_retention_time_sec,
393                full_gc_interval_sec: config.meta.full_gc_interval_sec,
394                full_gc_object_limit: config.meta.full_gc_object_limit,
395                gc_history_retention_time_sec: config.meta.gc_history_retention_time_sec,
396                max_inflight_time_travel_query: config.meta.max_inflight_time_travel_query,
397                enable_committed_sst_sanity_check: config.meta.enable_committed_sst_sanity_check,
398                periodic_compaction_interval_sec: config.meta.periodic_compaction_interval_sec,
399                node_num_monitor_interval_sec: config.meta.node_num_monitor_interval_sec,
400                prometheus_endpoint: opts.prometheus_endpoint,
401                prometheus_selector: opts.prometheus_selector,
402                vpc_id: opts.vpc_id,
403                security_group_id: opts.security_group_id,
404                privatelink_endpoint_default_tags,
405                periodic_space_reclaim_compaction_interval_sec: config
406                    .meta
407                    .periodic_space_reclaim_compaction_interval_sec,
408                telemetry_enabled: config.server.telemetry_enabled,
409                periodic_ttl_reclaim_compaction_interval_sec: config
410                    .meta
411                    .periodic_ttl_reclaim_compaction_interval_sec,
412                periodic_tombstone_reclaim_compaction_interval_sec: config
413                    .meta
414                    .periodic_tombstone_reclaim_compaction_interval_sec,
415                periodic_scheduling_compaction_group_split_interval_sec: config
416                    .meta
417                    .periodic_scheduling_compaction_group_split_interval_sec,
418                periodic_scheduling_compaction_group_merge_interval_sec: config
419                    .meta
420                    .periodic_scheduling_compaction_group_merge_interval_sec,
421                table_high_write_throughput_threshold: config
422                    .meta
423                    .table_high_write_throughput_threshold,
424                table_low_write_throughput_threshold: config
425                    .meta
426                    .table_low_write_throughput_threshold,
427                partition_vnode_count: config.meta.partition_vnode_count,
428                compact_task_table_size_partition_threshold_low: config
429                    .meta
430                    .compact_task_table_size_partition_threshold_low,
431                compact_task_table_size_partition_threshold_high: config
432                    .meta
433                    .compact_task_table_size_partition_threshold_high,
434                do_not_config_object_storage_lifecycle: config
435                    .meta
436                    .do_not_config_object_storage_lifecycle,
437                compaction_task_max_heartbeat_interval_secs: config
438                    .meta
439                    .compaction_task_max_heartbeat_interval_secs,
440                compaction_task_max_progress_interval_secs,
441                compaction_config: Some(config.meta.compaction_config),
442                hybrid_partition_node_count: config.meta.hybrid_partition_vnode_count,
443                event_log_enabled: config.meta.event_log_enabled,
444                event_log_channel_max_size: config.meta.event_log_channel_max_size,
445                advertise_addr: opts.advertise_addr,
446                cached_traces_num: config.meta.developer.cached_traces_num,
447                cached_traces_memory_limit_bytes: config
448                    .meta
449                    .developer
450                    .cached_traces_memory_limit_bytes,
451                enable_trivial_move: config.meta.developer.enable_trivial_move,
452                enable_check_task_level_overlap: config
453                    .meta
454                    .developer
455                    .enable_check_task_level_overlap,
456                enable_dropped_column_reclaim: config.meta.enable_dropped_column_reclaim,
457                split_group_size_ratio: config.meta.split_group_size_ratio,
458                table_stat_high_write_throughput_ratio_for_split: config
459                    .meta
460                    .table_stat_high_write_throughput_ratio_for_split,
461                table_stat_low_write_throughput_ratio_for_merge: config
462                    .meta
463                    .table_stat_low_write_throughput_ratio_for_merge,
464                table_stat_throuput_window_seconds_for_split: config
465                    .meta
466                    .table_stat_throuput_window_seconds_for_split,
467                table_stat_throuput_window_seconds_for_merge: config
468                    .meta
469                    .table_stat_throuput_window_seconds_for_merge,
470                object_store_config: config.storage.object_store,
471                max_trivial_move_task_count_per_loop: config
472                    .meta
473                    .developer
474                    .max_trivial_move_task_count_per_loop,
475                max_get_task_probe_times: config.meta.developer.max_get_task_probe_times,
476                secret_store_private_key,
477                temp_secret_file_dir: opts.temp_secret_file_dir,
478                actor_cnt_per_worker_parallelism_hard_limit: config
479                    .meta
480                    .developer
481                    .actor_cnt_per_worker_parallelism_hard_limit,
482                actor_cnt_per_worker_parallelism_soft_limit: config
483                    .meta
484                    .developer
485                    .actor_cnt_per_worker_parallelism_soft_limit,
486                license_key_path: opts.license_key_path,
487                compute_client_config: config.meta.developer.compute_client_config.clone(),
488                stream_client_config: config.meta.developer.stream_client_config.clone(),
489                frontend_client_config: config.meta.developer.frontend_client_config.clone(),
490            },
491            config.system.into_init_system_params(),
492            Default::default(),
493            shutdown,
494        )
495        .await
496        .unwrap();
497    })
498}
499
500fn validate_config(config: &RwConfig) {
501    if config.meta.meta_leader_lease_secs <= 2 {
502        let error_msg = "meta leader lease secs should be larger than 2";
503        tracing::error!(error_msg);
504        panic!("{}", error_msg);
505    }
506
507    if config.meta.parallelism_control_batch_size == 0 {
508        let error_msg = "parallelism control batch size should be larger than 0";
509        tracing::error!(error_msg);
510        panic!("{}", error_msg);
511    }
512}