risingwave_meta_node/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(let_chains)]
16#![feature(coverage_attribute)]
17
18mod server;
19
20use std::path::PathBuf;
21use std::time::Duration;
22
23use clap::Parser;
24use educe::Educe;
25pub use error::{MetaError, MetaResult};
26use redact::Secret;
27use risingwave_common::config::OverrideConfig;
28use risingwave_common::license::LicenseKey;
29use risingwave_common::util::meta_addr::MetaAddressStrategy;
30use risingwave_common::util::resource_util;
31use risingwave_common::util::tokio_util::sync::CancellationToken;
32use risingwave_common::{GIT_SHA, RW_VERSION};
33use risingwave_common_heap_profiling::HeapProfiler;
34use risingwave_meta::*;
35use risingwave_meta_service::*;
36pub use rpc::{ElectionClient, ElectionMember};
37use server::rpc_serve;
38pub use server::started::get as is_server_started;
39
40use crate::manager::MetaOpts;
41
42#[derive(Educe, Clone, Parser, OverrideConfig)]
43#[educe(Debug)]
44#[command(version, about = "The central metadata management service")]
45pub struct MetaNodeOpts {
46    // TODO: use `SocketAddr`
47    #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5690")]
48    pub listen_addr: String,
49
50    /// The address for contacting this instance of the service.
51    /// This would be synonymous with the service's "public address"
52    /// or "identifying address".
53    /// It will serve as a unique identifier in cluster
54    /// membership and leader election. Must be specified for meta backend.
55    #[clap(long, env = "RW_ADVERTISE_ADDR", default_value = "127.0.0.1:5690")]
56    pub advertise_addr: String,
57
58    #[clap(long, env = "RW_DASHBOARD_HOST")]
59    pub dashboard_host: Option<String>,
60
61    /// We will start a http server at this address via `MetricsManager`.
62    /// Then the prometheus instance will poll the metrics from this address.
63    #[clap(long, env = "RW_PROMETHEUS_HOST", alias = "prometheus-host")]
64    pub prometheus_listener_addr: Option<String>,
65
66    /// Endpoint of the SQL service, make it non-option when SQL service is required.
67    #[clap(long, hide = true, env = "RW_SQL_ENDPOINT")]
68    pub sql_endpoint: Option<Secret<String>>,
69
70    /// Username of sql backend, required when meta backend set to MySQL or PostgreSQL.
71    #[clap(long, hide = true, env = "RW_SQL_USERNAME", default_value = "")]
72    pub sql_username: String,
73
74    /// Password of sql backend, required when meta backend set to MySQL or PostgreSQL.
75    #[clap(long, hide = true, env = "RW_SQL_PASSWORD", default_value = "")]
76    pub sql_password: Secret<String>,
77
78    /// Database of sql backend, required when meta backend set to MySQL or PostgreSQL.
79    #[clap(long, hide = true, env = "RW_SQL_DATABASE", default_value = "")]
80    pub sql_database: String,
81
82    /// The HTTP REST-API address of the Prometheus instance associated to this cluster.
83    /// This address is used to serve `PromQL` queries to Prometheus.
84    /// It is also used by Grafana Dashboard Service to fetch metrics and visualize them.
85    #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")]
86    pub prometheus_endpoint: Option<String>,
87
88    /// The additional selector used when querying Prometheus.
89    ///
90    /// The format is same as `PromQL`. Example: `instance="foo",namespace="bar"`
91    #[clap(long, env = "RW_PROMETHEUS_SELECTOR")]
92    pub prometheus_selector: Option<String>,
93
94    /// Default tag for the endpoint created when creating a privatelink connection.
95    /// Will be appended to the tags specified in the `tags` field in with clause in `create
96    /// connection`.
97    #[clap(long, hide = true, env = "RW_PRIVATELINK_ENDPOINT_DEFAULT_TAGS")]
98    pub privatelink_endpoint_default_tags: Option<String>,
99
100    #[clap(long, hide = true, env = "RW_VPC_ID")]
101    pub vpc_id: Option<String>,
102
103    #[clap(long, hide = true, env = "RW_VPC_SECURITY_GROUP_ID")]
104    pub security_group_id: Option<String>,
105
106    /// The path of `risingwave.toml` configuration file.
107    ///
108    /// If empty, default configuration values will be used.
109    #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
110    pub config_path: String,
111
112    #[clap(long, hide = true, env = "RW_BACKEND", value_enum)]
113    #[override_opts(path = meta.backend)]
114    pub backend: Option<MetaBackend>,
115
116    /// The interval of periodic barrier.
117    #[clap(long, hide = true, env = "RW_BARRIER_INTERVAL_MS")]
118    #[override_opts(path = system.barrier_interval_ms)]
119    pub barrier_interval_ms: Option<u32>,
120
121    /// Target size of the Sstable.
122    #[clap(long, hide = true, env = "RW_SSTABLE_SIZE_MB")]
123    #[override_opts(path = system.sstable_size_mb)]
124    pub sstable_size_mb: Option<u32>,
125
126    /// Size of each block in bytes in SST.
127    #[clap(long, hide = true, env = "RW_BLOCK_SIZE_KB")]
128    #[override_opts(path = system.block_size_kb)]
129    pub block_size_kb: Option<u32>,
130
131    /// False positive probability of bloom filter.
132    #[clap(long, hide = true, env = "RW_BLOOM_FALSE_POSITIVE")]
133    #[override_opts(path = system.bloom_false_positive)]
134    pub bloom_false_positive: Option<f64>,
135
136    /// State store url
137    #[clap(long, hide = true, env = "RW_STATE_STORE")]
138    #[override_opts(path = system.state_store)]
139    pub state_store: Option<String>,
140
141    /// Remote directory for storing data and metadata objects.
142    #[clap(long, hide = true, env = "RW_DATA_DIRECTORY")]
143    #[override_opts(path = system.data_directory)]
144    pub data_directory: Option<String>,
145
146    /// Whether config object storage bucket lifecycle to purge stale data.
147    #[clap(long, hide = true, env = "RW_DO_NOT_CONFIG_BUCKET_LIFECYCLE")]
148    #[override_opts(path = meta.do_not_config_object_storage_lifecycle)]
149    pub do_not_config_object_storage_lifecycle: Option<bool>,
150
151    /// Remote storage url for storing snapshots.
152    #[clap(long, hide = true, env = "RW_BACKUP_STORAGE_URL")]
153    #[override_opts(path = system.backup_storage_url)]
154    pub backup_storage_url: Option<String>,
155
156    /// Remote directory for storing snapshots.
157    #[clap(long, hide = true, env = "RW_BACKUP_STORAGE_DIRECTORY")]
158    #[override_opts(path = system.backup_storage_directory)]
159    pub backup_storage_directory: Option<String>,
160
161    /// Enable heap profile dump when memory usage is high.
162    #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
163    #[override_opts(path = server.heap_profiling.dir)]
164    pub heap_profiling_dir: Option<String>,
165
166    /// Exit if idle for a certain period of time.
167    #[clap(long, hide = true, env = "RW_DANGEROUS_MAX_IDLE_SECS")]
168    #[override_opts(path = meta.dangerous_max_idle_secs)]
169    pub dangerous_max_idle_secs: Option<u64>,
170
171    /// Endpoint of the connector node.
172    #[deprecated = "connector node has been deprecated."]
173    #[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")]
174    pub connector_rpc_endpoint: Option<String>,
175
176    /// The license key to activate enterprise features.
177    #[clap(long, hide = true, env = "RW_LICENSE_KEY")]
178    #[override_opts(path = system.license_key)]
179    pub license_key: Option<LicenseKey>,
180
181    /// The path of the license key file to be watched and hot-reloaded.
182    #[clap(long, env = "RW_LICENSE_KEY_PATH")]
183    pub license_key_path: Option<PathBuf>,
184
185    /// 128-bit AES key for secret store in HEX format.
186    #[educe(Debug(ignore))] // TODO: use newtype to redact debug impl
187    #[clap(long, hide = true, env = "RW_SECRET_STORE_PRIVATE_KEY_HEX")]
188    pub secret_store_private_key_hex: Option<String>,
189
190    /// The path of the temp secret file directory.
191    #[clap(
192        long,
193        hide = true,
194        env = "RW_TEMP_SECRET_FILE_DIR",
195        default_value = "./secrets"
196    )]
197    pub temp_secret_file_dir: String,
198}
199
200impl risingwave_common::opts::Opts for MetaNodeOpts {
201    fn name() -> &'static str {
202        "meta"
203    }
204
205    fn meta_addr(&self) -> MetaAddressStrategy {
206        format!("http://{}", self.listen_addr)
207            .parse()
208            .expect("invalid listen address")
209    }
210}
211
212use std::future::Future;
213use std::pin::Pin;
214use std::sync::Arc;
215
216use risingwave_common::config::{MetaBackend, RwConfig, load_config};
217use tracing::info;
218
219/// Start meta node
220pub fn start(
221    opts: MetaNodeOpts,
222    shutdown: CancellationToken,
223) -> Pin<Box<dyn Future<Output = ()> + Send>> {
224    // WARNING: don't change the function signature. Making it `async fn` will cause
225    // slow compile in release mode.
226    Box::pin(async move {
227        info!("Starting meta node");
228        info!("> options: {:?}", opts);
229        let config = load_config(&opts.config_path, &opts);
230        info!("> config: {:?}", config);
231        info!("> version: {} ({})", RW_VERSION, GIT_SHA);
232        let listen_addr = opts.listen_addr.parse().unwrap();
233        let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap());
234        let prometheus_addr = opts.prometheus_listener_addr.map(|x| x.parse().unwrap());
235        let meta_store_config = config.meta.meta_store_config.clone();
236        let backend = match config.meta.backend {
237            MetaBackend::Mem => {
238                if opts.sql_endpoint.is_some() {
239                    tracing::warn!("`--sql-endpoint` is ignored when using `mem` backend");
240                }
241                MetaStoreBackend::Mem
242            }
243            MetaBackend::Sql => MetaStoreBackend::Sql {
244                endpoint: opts
245                    .sql_endpoint
246                    .expect("sql endpoint is required")
247                    .expose_secret()
248                    .clone(),
249                config: meta_store_config,
250            },
251            MetaBackend::Sqlite => MetaStoreBackend::Sql {
252                endpoint: format!(
253                    "sqlite://{}?mode=rwc",
254                    opts.sql_endpoint
255                        .expect("sql endpoint is required")
256                        .expose_secret()
257                ),
258                config: meta_store_config,
259            },
260            MetaBackend::Postgres => MetaStoreBackend::Sql {
261                endpoint: format!(
262                    "postgres://{}:{}@{}/{}",
263                    opts.sql_username,
264                    opts.sql_password.expose_secret(),
265                    opts.sql_endpoint
266                        .expect("sql endpoint is required")
267                        .expose_secret(),
268                    opts.sql_database
269                ),
270                config: meta_store_config,
271            },
272            MetaBackend::Mysql => MetaStoreBackend::Sql {
273                endpoint: format!(
274                    "mysql://{}:{}@{}/{}",
275                    opts.sql_username,
276                    opts.sql_password.expose_secret(),
277                    opts.sql_endpoint
278                        .expect("sql endpoint is required")
279                        .expose_secret(),
280                    opts.sql_database
281                ),
282                config: meta_store_config,
283            },
284        };
285        validate_config(&config);
286
287        let total_memory_bytes = resource_util::memory::system_memory_available_bytes();
288        let heap_profiler =
289            HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
290        // Run a background heap profiler
291        heap_profiler.start();
292
293        let secret_store_private_key = opts
294            .secret_store_private_key_hex
295            .map(|key| hex::decode(key).unwrap());
296        let max_heartbeat_interval =
297            Duration::from_secs(config.meta.max_heartbeat_interval_secs as u64);
298        let max_idle_ms = config.meta.dangerous_max_idle_secs.unwrap_or(0) * 1000;
299        let in_flight_barrier_nums = config.streaming.in_flight_barrier_nums;
300        let privatelink_endpoint_default_tags =
301            opts.privatelink_endpoint_default_tags.map(|tags| {
302                tags.split(',')
303                    .map(|s| {
304                        let key_val = s.split_once('=').unwrap();
305                        (key_val.0.to_owned(), key_val.1.to_owned())
306                    })
307                    .collect()
308            });
309
310        let add_info = AddressInfo {
311            advertise_addr: opts.advertise_addr.to_owned(),
312            listen_addr,
313            prometheus_addr,
314            dashboard_addr,
315        };
316
317        const MIN_TIMEOUT_INTERVAL_SEC: u64 = 20;
318        let compaction_task_max_progress_interval_secs = {
319            let retry_config = &config.storage.object_store.retry;
320            let max_streming_read_timeout_ms = (retry_config.streaming_read_attempt_timeout_ms
321                + retry_config.req_backoff_max_delay_ms)
322                * retry_config.streaming_read_retry_attempts as u64;
323            let max_streaming_upload_timeout_ms = (retry_config
324                .streaming_upload_attempt_timeout_ms
325                + retry_config.req_backoff_max_delay_ms)
326                * retry_config.streaming_upload_retry_attempts as u64;
327            let max_upload_timeout_ms = (retry_config.upload_attempt_timeout_ms
328                + retry_config.req_backoff_max_delay_ms)
329                * retry_config.upload_retry_attempts as u64;
330            let max_read_timeout_ms = (retry_config.read_attempt_timeout_ms
331                + retry_config.req_backoff_max_delay_ms)
332                * retry_config.read_retry_attempts as u64;
333            let max_timeout_ms = max_streming_read_timeout_ms
334                .max(max_upload_timeout_ms)
335                .max(max_streaming_upload_timeout_ms)
336                .max(max_read_timeout_ms)
337                .max(config.meta.compaction_task_max_progress_interval_secs * 1000);
338            max_timeout_ms / 1000
339        } + MIN_TIMEOUT_INTERVAL_SEC;
340
341        rpc_serve(
342            add_info,
343            backend,
344            max_heartbeat_interval,
345            config.meta.meta_leader_lease_secs,
346            MetaOpts {
347                enable_recovery: !config.meta.disable_recovery,
348                disable_automatic_parallelism_control: config
349                    .meta
350                    .disable_automatic_parallelism_control,
351                parallelism_control_batch_size: config.meta.parallelism_control_batch_size,
352                parallelism_control_trigger_period_sec: config
353                    .meta
354                    .parallelism_control_trigger_period_sec,
355                parallelism_control_trigger_first_delay_sec: config
356                    .meta
357                    .parallelism_control_trigger_first_delay_sec,
358                in_flight_barrier_nums,
359                max_idle_ms,
360                compaction_deterministic_test: config.meta.enable_compaction_deterministic,
361                default_parallelism: config.meta.default_parallelism,
362                vacuum_interval_sec: config.meta.vacuum_interval_sec,
363                time_travel_vacuum_interval_sec: config
364                    .meta
365                    .developer
366                    .time_travel_vacuum_interval_sec,
367                vacuum_spin_interval_ms: config.meta.vacuum_spin_interval_ms,
368                hummock_version_checkpoint_interval_sec: config
369                    .meta
370                    .hummock_version_checkpoint_interval_sec,
371                enable_hummock_data_archive: config.meta.enable_hummock_data_archive,
372                hummock_time_travel_snapshot_interval: config
373                    .meta
374                    .hummock_time_travel_snapshot_interval,
375                hummock_time_travel_sst_info_fetch_batch_size: config
376                    .meta
377                    .developer
378                    .hummock_time_travel_sst_info_fetch_batch_size,
379                hummock_time_travel_sst_info_insert_batch_size: config
380                    .meta
381                    .developer
382                    .hummock_time_travel_sst_info_insert_batch_size,
383                hummock_time_travel_epoch_version_insert_batch_size: config
384                    .meta
385                    .developer
386                    .hummock_time_travel_epoch_version_insert_batch_size,
387                hummock_gc_history_insert_batch_size: config
388                    .meta
389                    .developer
390                    .hummock_gc_history_insert_batch_size,
391                hummock_time_travel_filter_out_objects_batch_size: config
392                    .meta
393                    .developer
394                    .hummock_time_travel_filter_out_objects_batch_size,
395                hummock_time_travel_filter_out_objects_v1: config
396                    .meta
397                    .developer
398                    .hummock_time_travel_filter_out_objects_v1,
399                hummock_time_travel_filter_out_objects_list_version_batch_size: config
400                    .meta
401                    .developer
402                    .hummock_time_travel_filter_out_objects_list_version_batch_size,
403                hummock_time_travel_filter_out_objects_list_delta_batch_size: config
404                    .meta
405                    .developer
406                    .hummock_time_travel_filter_out_objects_list_delta_batch_size,
407                min_delta_log_num_for_hummock_version_checkpoint: config
408                    .meta
409                    .min_delta_log_num_for_hummock_version_checkpoint,
410                min_sst_retention_time_sec: config.meta.min_sst_retention_time_sec,
411                full_gc_interval_sec: config.meta.full_gc_interval_sec,
412                full_gc_object_limit: config.meta.full_gc_object_limit,
413                gc_history_retention_time_sec: config.meta.gc_history_retention_time_sec,
414                max_inflight_time_travel_query: config.meta.max_inflight_time_travel_query,
415                enable_committed_sst_sanity_check: config.meta.enable_committed_sst_sanity_check,
416                periodic_compaction_interval_sec: config.meta.periodic_compaction_interval_sec,
417                node_num_monitor_interval_sec: config.meta.node_num_monitor_interval_sec,
418                protect_drop_table_with_incoming_sink: config
419                    .meta
420                    .protect_drop_table_with_incoming_sink,
421                prometheus_endpoint: opts.prometheus_endpoint,
422                prometheus_selector: opts.prometheus_selector,
423                vpc_id: opts.vpc_id,
424                security_group_id: opts.security_group_id,
425                privatelink_endpoint_default_tags,
426                periodic_space_reclaim_compaction_interval_sec: config
427                    .meta
428                    .periodic_space_reclaim_compaction_interval_sec,
429                telemetry_enabled: config.server.telemetry_enabled,
430                periodic_ttl_reclaim_compaction_interval_sec: config
431                    .meta
432                    .periodic_ttl_reclaim_compaction_interval_sec,
433                periodic_tombstone_reclaim_compaction_interval_sec: config
434                    .meta
435                    .periodic_tombstone_reclaim_compaction_interval_sec,
436                periodic_scheduling_compaction_group_split_interval_sec: config
437                    .meta
438                    .periodic_scheduling_compaction_group_split_interval_sec,
439                periodic_scheduling_compaction_group_merge_interval_sec: config
440                    .meta
441                    .periodic_scheduling_compaction_group_merge_interval_sec,
442                compaction_group_merge_dimension_threshold: config
443                    .meta
444                    .compaction_group_merge_dimension_threshold,
445                table_high_write_throughput_threshold: config
446                    .meta
447                    .table_high_write_throughput_threshold,
448                table_low_write_throughput_threshold: config
449                    .meta
450                    .table_low_write_throughput_threshold,
451                partition_vnode_count: config.meta.partition_vnode_count,
452                compact_task_table_size_partition_threshold_low: config
453                    .meta
454                    .compact_task_table_size_partition_threshold_low,
455                compact_task_table_size_partition_threshold_high: config
456                    .meta
457                    .compact_task_table_size_partition_threshold_high,
458                do_not_config_object_storage_lifecycle: config
459                    .meta
460                    .do_not_config_object_storage_lifecycle,
461                compaction_task_max_heartbeat_interval_secs: config
462                    .meta
463                    .compaction_task_max_heartbeat_interval_secs,
464                compaction_task_max_progress_interval_secs,
465                compaction_config: Some(config.meta.compaction_config),
466                hybrid_partition_node_count: config.meta.hybrid_partition_vnode_count,
467                event_log_enabled: config.meta.event_log_enabled,
468                event_log_channel_max_size: config.meta.event_log_channel_max_size,
469                advertise_addr: opts.advertise_addr,
470                cached_traces_num: config.meta.developer.cached_traces_num,
471                cached_traces_memory_limit_bytes: config
472                    .meta
473                    .developer
474                    .cached_traces_memory_limit_bytes,
475                enable_trivial_move: config.meta.developer.enable_trivial_move,
476                enable_check_task_level_overlap: config
477                    .meta
478                    .developer
479                    .enable_check_task_level_overlap,
480                enable_dropped_column_reclaim: config.meta.enable_dropped_column_reclaim,
481                split_group_size_ratio: config.meta.split_group_size_ratio,
482                table_stat_high_write_throughput_ratio_for_split: config
483                    .meta
484                    .table_stat_high_write_throughput_ratio_for_split,
485                table_stat_low_write_throughput_ratio_for_merge: config
486                    .meta
487                    .table_stat_low_write_throughput_ratio_for_merge,
488                table_stat_throuput_window_seconds_for_split: config
489                    .meta
490                    .table_stat_throuput_window_seconds_for_split,
491                table_stat_throuput_window_seconds_for_merge: config
492                    .meta
493                    .table_stat_throuput_window_seconds_for_merge,
494                object_store_config: config.storage.object_store,
495                max_trivial_move_task_count_per_loop: config
496                    .meta
497                    .developer
498                    .max_trivial_move_task_count_per_loop,
499                max_get_task_probe_times: config.meta.developer.max_get_task_probe_times,
500                secret_store_private_key,
501                temp_secret_file_dir: opts.temp_secret_file_dir,
502                actor_cnt_per_worker_parallelism_hard_limit: config
503                    .meta
504                    .developer
505                    .actor_cnt_per_worker_parallelism_hard_limit,
506                actor_cnt_per_worker_parallelism_soft_limit: config
507                    .meta
508                    .developer
509                    .actor_cnt_per_worker_parallelism_soft_limit,
510                license_key_path: opts.license_key_path,
511                compute_client_config: config.meta.developer.compute_client_config.clone(),
512                stream_client_config: config.meta.developer.stream_client_config.clone(),
513                frontend_client_config: config.meta.developer.frontend_client_config.clone(),
514                redact_sql_option_keywords: Arc::new(
515                    config
516                        .batch
517                        .redact_sql_option_keywords
518                        .into_iter()
519                        .collect(),
520                ),
521            },
522            config.system.into_init_system_params(),
523            Default::default(),
524            shutdown,
525        )
526        .await
527        .unwrap();
528    })
529}
530
531fn validate_config(config: &RwConfig) {
532    if config.meta.meta_leader_lease_secs <= 2 {
533        let error_msg = "meta leader lease secs should be larger than 2";
534        tracing::error!(error_msg);
535        panic!("{}", error_msg);
536    }
537
538    if config.meta.parallelism_control_batch_size == 0 {
539        let error_msg = "parallelism control batch size should be larger than 0";
540        tracing::error!(error_msg);
541        panic!("{}", error_msg);
542    }
543}