risingwave_meta_node/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(coverage_attribute)]
16
17mod server;
18
19use std::path::PathBuf;
20use std::time::Duration;
21
22use clap::Parser;
23use educe::Educe;
24pub use error::{MetaError, MetaResult};
25use redact::Secret;
26use risingwave_common::config::OverrideConfig;
27use risingwave_common::license::LicenseKey;
28use risingwave_common::util::meta_addr::MetaAddressStrategy;
29use risingwave_common::util::resource_util;
30use risingwave_common::util::tokio_util::sync::CancellationToken;
31use risingwave_common::{GIT_SHA, RW_VERSION};
32use risingwave_common_heap_profiling::HeapProfiler;
33use risingwave_meta::*;
34use risingwave_meta_service::*;
35pub use rpc::{ElectionClient, ElectionMember};
36use server::rpc_serve;
37pub use server::started::get as is_server_started;
38
39use crate::manager::MetaOpts;
40
41#[derive(Educe, Clone, Parser, OverrideConfig)]
42#[educe(Debug)]
43#[command(version, about = "The central metadata management service")]
44pub struct MetaNodeOpts {
45    // TODO: use `SocketAddr`
46    #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5690")]
47    pub listen_addr: String,
48
49    /// The address for contacting this instance of the service.
50    /// This would be synonymous with the service's "public address"
51    /// or "identifying address".
52    /// It will serve as a unique identifier in cluster
53    /// membership and leader election. Must be specified for meta backend.
54    #[clap(long, env = "RW_ADVERTISE_ADDR", default_value = "127.0.0.1:5690")]
55    pub advertise_addr: String,
56
57    #[clap(long, env = "RW_DASHBOARD_HOST")]
58    pub dashboard_host: Option<String>,
59
60    /// We will start a http server at this address via `MetricsManager`.
61    /// Then the prometheus instance will poll the metrics from this address.
62    #[clap(long, env = "RW_PROMETHEUS_HOST", alias = "prometheus-host")]
63    pub prometheus_listener_addr: Option<String>,
64
65    /// Endpoint of the SQL service, make it non-option when SQL service is required.
66    #[clap(long, hide = true, env = "RW_SQL_ENDPOINT")]
67    pub sql_endpoint: Option<Secret<String>>,
68
69    /// Username of sql backend, required when meta backend set to MySQL or PostgreSQL.
70    #[clap(long, hide = true, env = "RW_SQL_USERNAME", default_value = "")]
71    pub sql_username: String,
72
73    /// Password of sql backend, required when meta backend set to MySQL or PostgreSQL.
74    #[clap(long, hide = true, env = "RW_SQL_PASSWORD", default_value = "")]
75    pub sql_password: Secret<String>,
76
77    /// Database of sql backend, required when meta backend set to MySQL or PostgreSQL.
78    #[clap(long, hide = true, env = "RW_SQL_DATABASE", default_value = "")]
79    pub sql_database: String,
80
81    /// Params for the URL connection, such as `sslmode=disable`.
82    /// Example: `param1=value1&param2=value2`
83    #[clap(long, hide = true, env = "RW_SQL_URL_PARAMS")]
84    pub sql_url_params: Option<String>,
85
86    /// The HTTP REST-API address of the Prometheus instance associated to this cluster.
87    /// This address is used to serve `PromQL` queries to Prometheus.
88    /// It is also used by Grafana Dashboard Service to fetch metrics and visualize them.
89    #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")]
90    pub prometheus_endpoint: Option<String>,
91
92    /// The additional selector used when querying Prometheus.
93    ///
94    /// The format is same as `PromQL`. Example: `instance="foo",namespace="bar"`
95    #[clap(long, env = "RW_PROMETHEUS_SELECTOR")]
96    pub prometheus_selector: Option<String>,
97
98    /// Default tag for the endpoint created when creating a privatelink connection.
99    /// Will be appended to the tags specified in the `tags` field in with clause in `create
100    /// connection`.
101    #[clap(long, hide = true, env = "RW_PRIVATELINK_ENDPOINT_DEFAULT_TAGS")]
102    pub privatelink_endpoint_default_tags: Option<String>,
103
104    #[clap(long, hide = true, env = "RW_VPC_ID")]
105    pub vpc_id: Option<String>,
106
107    #[clap(long, hide = true, env = "RW_VPC_SECURITY_GROUP_ID")]
108    pub security_group_id: Option<String>,
109
110    /// The path of `risingwave.toml` configuration file.
111    ///
112    /// If empty, default configuration values will be used.
113    #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
114    pub config_path: String,
115
116    #[clap(long, hide = true, env = "RW_BACKEND", value_enum)]
117    #[override_opts(path = meta.backend)]
118    pub backend: Option<MetaBackend>,
119
120    /// The interval of periodic barrier.
121    #[clap(long, hide = true, env = "RW_BARRIER_INTERVAL_MS")]
122    #[override_opts(path = system.barrier_interval_ms)]
123    pub barrier_interval_ms: Option<u32>,
124
125    /// Target size of the Sstable.
126    #[clap(long, hide = true, env = "RW_SSTABLE_SIZE_MB")]
127    #[override_opts(path = system.sstable_size_mb)]
128    pub sstable_size_mb: Option<u32>,
129
130    /// Size of each block in bytes in SST.
131    #[clap(long, hide = true, env = "RW_BLOCK_SIZE_KB")]
132    #[override_opts(path = system.block_size_kb)]
133    pub block_size_kb: Option<u32>,
134
135    /// False positive probability of bloom filter.
136    #[clap(long, hide = true, env = "RW_BLOOM_FALSE_POSITIVE")]
137    #[override_opts(path = system.bloom_false_positive)]
138    pub bloom_false_positive: Option<f64>,
139
140    /// State store url
141    #[clap(long, hide = true, env = "RW_STATE_STORE")]
142    #[override_opts(path = system.state_store)]
143    pub state_store: Option<String>,
144
145    /// Remote directory for storing data and metadata objects.
146    #[clap(long, hide = true, env = "RW_DATA_DIRECTORY")]
147    #[override_opts(path = system.data_directory)]
148    pub data_directory: Option<String>,
149
150    /// Whether config object storage bucket lifecycle to purge stale data.
151    #[clap(long, hide = true, env = "RW_DO_NOT_CONFIG_BUCKET_LIFECYCLE")]
152    #[override_opts(path = meta.do_not_config_object_storage_lifecycle)]
153    pub do_not_config_object_storage_lifecycle: Option<bool>,
154
155    /// Remote storage url for storing snapshots.
156    #[clap(long, hide = true, env = "RW_BACKUP_STORAGE_URL")]
157    #[override_opts(path = system.backup_storage_url)]
158    pub backup_storage_url: Option<String>,
159
160    /// Remote directory for storing snapshots.
161    #[clap(long, hide = true, env = "RW_BACKUP_STORAGE_DIRECTORY")]
162    #[override_opts(path = system.backup_storage_directory)]
163    pub backup_storage_directory: Option<String>,
164
165    /// Enable heap profile dump when memory usage is high.
166    #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
167    #[override_opts(path = server.heap_profiling.dir)]
168    pub heap_profiling_dir: Option<String>,
169
170    /// Exit if idle for a certain period of time.
171    #[clap(long, hide = true, env = "RW_DANGEROUS_MAX_IDLE_SECS")]
172    #[override_opts(path = meta.dangerous_max_idle_secs)]
173    pub dangerous_max_idle_secs: Option<u64>,
174
175    /// Endpoint of the connector node.
176    #[deprecated = "connector node has been deprecated."]
177    #[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")]
178    pub connector_rpc_endpoint: Option<String>,
179
180    /// The license key to activate enterprise features.
181    #[clap(long, hide = true, env = "RW_LICENSE_KEY")]
182    #[override_opts(path = system.license_key)]
183    pub license_key: Option<LicenseKey>,
184
185    /// The path of the license key file to be watched and hot-reloaded.
186    #[clap(long, env = "RW_LICENSE_KEY_PATH")]
187    pub license_key_path: Option<PathBuf>,
188
189    /// 128-bit AES key for secret store in HEX format.
190    #[educe(Debug(ignore))] // TODO: use newtype to redact debug impl
191    #[clap(long, hide = true, env = "RW_SECRET_STORE_PRIVATE_KEY_HEX")]
192    pub secret_store_private_key_hex: Option<String>,
193
194    /// The path of the temp secret file directory.
195    #[clap(
196        long,
197        hide = true,
198        env = "RW_TEMP_SECRET_FILE_DIR",
199        default_value = "./secrets"
200    )]
201    pub temp_secret_file_dir: String,
202}
203
204impl risingwave_common::opts::Opts for MetaNodeOpts {
205    fn name() -> &'static str {
206        "meta"
207    }
208
209    fn meta_addr(&self) -> MetaAddressStrategy {
210        format!("http://{}", self.listen_addr)
211            .parse()
212            .expect("invalid listen address")
213    }
214}
215
216use std::future::Future;
217use std::pin::Pin;
218use std::sync::Arc;
219
220use risingwave_common::config::{MetaBackend, RwConfig, load_config};
221use tracing::info;
222
223/// Start meta node
224pub fn start(
225    opts: MetaNodeOpts,
226    shutdown: CancellationToken,
227) -> Pin<Box<dyn Future<Output = ()> + Send>> {
228    // WARNING: don't change the function signature. Making it `async fn` will cause
229    // slow compile in release mode.
230    Box::pin(async move {
231        info!("Starting meta node");
232        info!("> options: {:?}", opts);
233        let config = load_config(&opts.config_path, &opts);
234        info!("> config: {:?}", config);
235        info!("> version: {} ({})", RW_VERSION, GIT_SHA);
236        let listen_addr = opts.listen_addr.parse().unwrap();
237        let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap());
238        let prometheus_addr = opts.prometheus_listener_addr.map(|x| x.parse().unwrap());
239        let meta_store_config = config.meta.meta_store_config.clone();
240        let backend = match config.meta.backend {
241            MetaBackend::Mem => {
242                if opts.sql_endpoint.is_some() {
243                    tracing::warn!("`--sql-endpoint` is ignored when using `mem` backend");
244                }
245                MetaStoreBackend::Mem
246            }
247            MetaBackend::Sql => MetaStoreBackend::Sql {
248                endpoint: opts
249                    .sql_endpoint
250                    .expect("sql endpoint is required")
251                    .expose_secret()
252                    .clone(),
253                config: meta_store_config,
254            },
255            MetaBackend::Sqlite => MetaStoreBackend::Sql {
256                endpoint: format!(
257                    "sqlite://{}?mode=rwc",
258                    opts.sql_endpoint
259                        .expect("sql endpoint is required")
260                        .expose_secret()
261                ),
262                config: meta_store_config,
263            },
264            MetaBackend::Postgres => MetaStoreBackend::Sql {
265                endpoint: format!(
266                    "postgres://{}:{}@{}/{}{}",
267                    opts.sql_username,
268                    opts.sql_password.expose_secret(),
269                    opts.sql_endpoint
270                        .expect("sql endpoint is required")
271                        .expose_secret(),
272                    opts.sql_database,
273                    if let Some(params) = &opts.sql_url_params
274                        && !params.is_empty()
275                    {
276                        format!("?{}", params)
277                    } else {
278                        "".to_owned()
279                    }
280                ),
281                config: meta_store_config,
282            },
283            MetaBackend::Mysql => MetaStoreBackend::Sql {
284                endpoint: format!(
285                    "mysql://{}:{}@{}/{}{}",
286                    opts.sql_username,
287                    opts.sql_password.expose_secret(),
288                    opts.sql_endpoint
289                        .expect("sql endpoint is required")
290                        .expose_secret(),
291                    opts.sql_database,
292                    if let Some(params) = &opts.sql_url_params
293                        && !params.is_empty()
294                    {
295                        format!("?{}", params)
296                    } else {
297                        "".to_owned()
298                    }
299                ),
300                config: meta_store_config,
301            },
302        };
303        validate_config(&config);
304
305        let total_memory_bytes = resource_util::memory::system_memory_available_bytes();
306        let heap_profiler =
307            HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
308        // Run a background heap profiler
309        heap_profiler.start();
310
311        let secret_store_private_key = opts
312            .secret_store_private_key_hex
313            .map(|key| hex::decode(key).unwrap());
314        let max_heartbeat_interval =
315            Duration::from_secs(config.meta.max_heartbeat_interval_secs as u64);
316        let max_idle_ms = config.meta.dangerous_max_idle_secs.unwrap_or(0) * 1000;
317        let in_flight_barrier_nums = config.streaming.in_flight_barrier_nums;
318        let privatelink_endpoint_default_tags =
319            opts.privatelink_endpoint_default_tags.map(|tags| {
320                tags.split(',')
321                    .map(|s| {
322                        let key_val = s.split_once('=').unwrap();
323                        (key_val.0.to_owned(), key_val.1.to_owned())
324                    })
325                    .collect()
326            });
327
328        let add_info = AddressInfo {
329            advertise_addr: opts.advertise_addr.clone(),
330            listen_addr,
331            prometheus_addr,
332            dashboard_addr,
333        };
334
335        const MIN_TIMEOUT_INTERVAL_SEC: u64 = 20;
336        let compaction_task_max_progress_interval_secs = {
337            let retry_config = &config.storage.object_store.retry;
338            let max_streaming_read_timeout_ms = (retry_config.streaming_read_attempt_timeout_ms
339                + retry_config.req_backoff_max_delay_ms)
340                * retry_config.streaming_read_retry_attempts as u64;
341            let max_streaming_upload_timeout_ms = (retry_config
342                .streaming_upload_attempt_timeout_ms
343                + retry_config.req_backoff_max_delay_ms)
344                * retry_config.streaming_upload_retry_attempts as u64;
345            let max_upload_timeout_ms = (retry_config.upload_attempt_timeout_ms
346                + retry_config.req_backoff_max_delay_ms)
347                * retry_config.upload_retry_attempts as u64;
348            let max_read_timeout_ms = (retry_config.read_attempt_timeout_ms
349                + retry_config.req_backoff_max_delay_ms)
350                * retry_config.read_retry_attempts as u64;
351            let max_timeout_ms = max_streaming_read_timeout_ms
352                .max(max_upload_timeout_ms)
353                .max(max_streaming_upload_timeout_ms)
354                .max(max_read_timeout_ms)
355                .max(config.meta.compaction_task_max_progress_interval_secs * 1000);
356            max_timeout_ms / 1000
357        } + MIN_TIMEOUT_INTERVAL_SEC;
358
359        rpc_serve(
360            add_info,
361            backend,
362            max_heartbeat_interval,
363            config.meta.meta_leader_lease_secs,
364            MetaOpts {
365                enable_recovery: !config.meta.disable_recovery,
366                disable_automatic_parallelism_control: config
367                    .meta
368                    .disable_automatic_parallelism_control,
369                parallelism_control_batch_size: config.meta.parallelism_control_batch_size,
370                parallelism_control_trigger_period_sec: config
371                    .meta
372                    .parallelism_control_trigger_period_sec,
373                parallelism_control_trigger_first_delay_sec: config
374                    .meta
375                    .parallelism_control_trigger_first_delay_sec,
376                in_flight_barrier_nums,
377                max_idle_ms,
378                compaction_deterministic_test: config.meta.enable_compaction_deterministic,
379                default_parallelism: config.meta.default_parallelism,
380                vacuum_interval_sec: config.meta.vacuum_interval_sec,
381                time_travel_vacuum_interval_sec: config
382                    .meta
383                    .developer
384                    .time_travel_vacuum_interval_sec,
385                vacuum_spin_interval_ms: config.meta.vacuum_spin_interval_ms,
386                iceberg_gc_interval_sec: config.meta.iceberg_gc_interval_sec,
387                hummock_version_checkpoint_interval_sec: config
388                    .meta
389                    .hummock_version_checkpoint_interval_sec,
390                enable_hummock_data_archive: config.meta.enable_hummock_data_archive,
391                hummock_time_travel_snapshot_interval: config
392                    .meta
393                    .hummock_time_travel_snapshot_interval,
394                hummock_time_travel_sst_info_fetch_batch_size: config
395                    .meta
396                    .developer
397                    .hummock_time_travel_sst_info_fetch_batch_size,
398                hummock_time_travel_sst_info_insert_batch_size: config
399                    .meta
400                    .developer
401                    .hummock_time_travel_sst_info_insert_batch_size,
402                hummock_time_travel_epoch_version_insert_batch_size: config
403                    .meta
404                    .developer
405                    .hummock_time_travel_epoch_version_insert_batch_size,
406                hummock_gc_history_insert_batch_size: config
407                    .meta
408                    .developer
409                    .hummock_gc_history_insert_batch_size,
410                hummock_time_travel_filter_out_objects_batch_size: config
411                    .meta
412                    .developer
413                    .hummock_time_travel_filter_out_objects_batch_size,
414                hummock_time_travel_filter_out_objects_v1: config
415                    .meta
416                    .developer
417                    .hummock_time_travel_filter_out_objects_v1,
418                hummock_time_travel_filter_out_objects_list_version_batch_size: config
419                    .meta
420                    .developer
421                    .hummock_time_travel_filter_out_objects_list_version_batch_size,
422                hummock_time_travel_filter_out_objects_list_delta_batch_size: config
423                    .meta
424                    .developer
425                    .hummock_time_travel_filter_out_objects_list_delta_batch_size,
426                min_delta_log_num_for_hummock_version_checkpoint: config
427                    .meta
428                    .min_delta_log_num_for_hummock_version_checkpoint,
429                min_sst_retention_time_sec: config.meta.min_sst_retention_time_sec,
430                full_gc_interval_sec: config.meta.full_gc_interval_sec,
431                full_gc_object_limit: config.meta.full_gc_object_limit,
432                gc_history_retention_time_sec: config.meta.gc_history_retention_time_sec,
433                max_inflight_time_travel_query: config.meta.max_inflight_time_travel_query,
434                enable_committed_sst_sanity_check: config.meta.enable_committed_sst_sanity_check,
435                periodic_compaction_interval_sec: config.meta.periodic_compaction_interval_sec,
436                node_num_monitor_interval_sec: config.meta.node_num_monitor_interval_sec,
437                protect_drop_table_with_incoming_sink: config
438                    .meta
439                    .protect_drop_table_with_incoming_sink,
440                prometheus_endpoint: opts.prometheus_endpoint,
441                prometheus_selector: opts.prometheus_selector,
442                vpc_id: opts.vpc_id,
443                security_group_id: opts.security_group_id,
444                privatelink_endpoint_default_tags,
445                periodic_space_reclaim_compaction_interval_sec: config
446                    .meta
447                    .periodic_space_reclaim_compaction_interval_sec,
448                telemetry_enabled: config.server.telemetry_enabled,
449                periodic_ttl_reclaim_compaction_interval_sec: config
450                    .meta
451                    .periodic_ttl_reclaim_compaction_interval_sec,
452                periodic_tombstone_reclaim_compaction_interval_sec: config
453                    .meta
454                    .periodic_tombstone_reclaim_compaction_interval_sec,
455                periodic_scheduling_compaction_group_split_interval_sec: config
456                    .meta
457                    .periodic_scheduling_compaction_group_split_interval_sec,
458                periodic_scheduling_compaction_group_merge_interval_sec: config
459                    .meta
460                    .periodic_scheduling_compaction_group_merge_interval_sec,
461                compaction_group_merge_dimension_threshold: config
462                    .meta
463                    .compaction_group_merge_dimension_threshold,
464                table_high_write_throughput_threshold: config
465                    .meta
466                    .table_high_write_throughput_threshold,
467                table_low_write_throughput_threshold: config
468                    .meta
469                    .table_low_write_throughput_threshold,
470                partition_vnode_count: config.meta.partition_vnode_count,
471                compact_task_table_size_partition_threshold_low: config
472                    .meta
473                    .compact_task_table_size_partition_threshold_low,
474                compact_task_table_size_partition_threshold_high: config
475                    .meta
476                    .compact_task_table_size_partition_threshold_high,
477                do_not_config_object_storage_lifecycle: config
478                    .meta
479                    .do_not_config_object_storage_lifecycle,
480                compaction_task_max_heartbeat_interval_secs: config
481                    .meta
482                    .compaction_task_max_heartbeat_interval_secs,
483                compaction_task_max_progress_interval_secs,
484                compaction_config: Some(config.meta.compaction_config),
485                hybrid_partition_node_count: config.meta.hybrid_partition_vnode_count,
486                event_log_enabled: config.meta.event_log_enabled,
487                event_log_channel_max_size: config.meta.event_log_channel_max_size,
488                advertise_addr: opts.advertise_addr,
489                cached_traces_num: config.meta.developer.cached_traces_num,
490                cached_traces_memory_limit_bytes: config
491                    .meta
492                    .developer
493                    .cached_traces_memory_limit_bytes,
494                enable_trivial_move: config.meta.developer.enable_trivial_move,
495                enable_check_task_level_overlap: config
496                    .meta
497                    .developer
498                    .enable_check_task_level_overlap,
499                enable_dropped_column_reclaim: config.meta.enable_dropped_column_reclaim,
500                split_group_size_ratio: config.meta.split_group_size_ratio,
501                table_stat_high_write_throughput_ratio_for_split: config
502                    .meta
503                    .table_stat_high_write_throughput_ratio_for_split,
504                table_stat_low_write_throughput_ratio_for_merge: config
505                    .meta
506                    .table_stat_low_write_throughput_ratio_for_merge,
507                table_stat_throuput_window_seconds_for_split: config
508                    .meta
509                    .table_stat_throuput_window_seconds_for_split,
510                table_stat_throuput_window_seconds_for_merge: config
511                    .meta
512                    .table_stat_throuput_window_seconds_for_merge,
513                object_store_config: config.storage.object_store,
514                max_trivial_move_task_count_per_loop: config
515                    .meta
516                    .developer
517                    .max_trivial_move_task_count_per_loop,
518                max_get_task_probe_times: config.meta.developer.max_get_task_probe_times,
519                secret_store_private_key,
520                temp_secret_file_dir: opts.temp_secret_file_dir,
521                actor_cnt_per_worker_parallelism_hard_limit: config
522                    .meta
523                    .developer
524                    .actor_cnt_per_worker_parallelism_hard_limit,
525                actor_cnt_per_worker_parallelism_soft_limit: config
526                    .meta
527                    .developer
528                    .actor_cnt_per_worker_parallelism_soft_limit,
529                license_key_path: opts.license_key_path,
530                compute_client_config: config.meta.developer.compute_client_config.clone(),
531                stream_client_config: config.meta.developer.stream_client_config.clone(),
532                frontend_client_config: config.meta.developer.frontend_client_config.clone(),
533                redact_sql_option_keywords: Arc::new(
534                    config
535                        .batch
536                        .redact_sql_option_keywords
537                        .into_iter()
538                        .collect(),
539                ),
540                cdc_table_split_init_sleep_interval_splits: config
541                    .meta
542                    .cdc_table_split_init_sleep_interval_splits,
543                cdc_table_split_init_sleep_duration_millis: config
544                    .meta
545                    .cdc_table_split_init_sleep_duration_millis,
546                cdc_table_split_init_insert_batch_size: config
547                    .meta
548                    .cdc_table_split_init_insert_batch_size,
549            },
550            config.system.into_init_system_params(),
551            Default::default(),
552            shutdown,
553        )
554        .await
555        .unwrap();
556    })
557}
558
559fn validate_config(config: &RwConfig) {
560    if config.meta.meta_leader_lease_secs <= 2 {
561        let error_msg = "meta leader lease secs should be larger than 2";
562        tracing::error!(error_msg);
563        panic!("{}", error_msg);
564    }
565
566    if config.meta.parallelism_control_batch_size == 0 {
567        let error_msg = "parallelism control batch size should be larger than 0";
568        tracing::error!(error_msg);
569        panic!("{}", error_msg);
570    }
571}