risingwave_meta_node/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(let_chains)]
16#![feature(coverage_attribute)]
17
18mod server;
19
20use std::path::PathBuf;
21use std::time::Duration;
22
23use clap::Parser;
24use educe::Educe;
25pub use error::{MetaError, MetaResult};
26use redact::Secret;
27use risingwave_common::config::OverrideConfig;
28use risingwave_common::license::LicenseKey;
29use risingwave_common::util::meta_addr::MetaAddressStrategy;
30use risingwave_common::util::resource_util;
31use risingwave_common::util::tokio_util::sync::CancellationToken;
32use risingwave_common::{GIT_SHA, RW_VERSION};
33use risingwave_common_heap_profiling::HeapProfiler;
34use risingwave_meta::*;
35use risingwave_meta_service::*;
36pub use rpc::{ElectionClient, ElectionMember};
37use server::rpc_serve;
38pub use server::started::get as is_server_started;
39
40use crate::manager::MetaOpts;
41
42#[derive(Educe, Clone, Parser, OverrideConfig)]
43#[educe(Debug)]
44#[command(version, about = "The central metadata management service")]
45pub struct MetaNodeOpts {
46    // TODO: use `SocketAddr`
47    #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5690")]
48    pub listen_addr: String,
49
50    /// The address for contacting this instance of the service.
51    /// This would be synonymous with the service's "public address"
52    /// or "identifying address".
53    /// It will serve as a unique identifier in cluster
54    /// membership and leader election. Must be specified for meta backend.
55    #[clap(long, env = "RW_ADVERTISE_ADDR", default_value = "127.0.0.1:5690")]
56    pub advertise_addr: String,
57
58    #[clap(long, env = "RW_DASHBOARD_HOST")]
59    pub dashboard_host: Option<String>,
60
61    /// We will start a http server at this address via `MetricsManager`.
62    /// Then the prometheus instance will poll the metrics from this address.
63    #[clap(long, env = "RW_PROMETHEUS_HOST", alias = "prometheus-host")]
64    pub prometheus_listener_addr: Option<String>,
65
66    /// Endpoint of the SQL service, make it non-option when SQL service is required.
67    #[clap(long, hide = true, env = "RW_SQL_ENDPOINT")]
68    pub sql_endpoint: Option<Secret<String>>,
69
70    /// Username of sql backend, required when meta backend set to MySQL or PostgreSQL.
71    #[clap(long, hide = true, env = "RW_SQL_USERNAME", default_value = "")]
72    pub sql_username: String,
73
74    /// Password of sql backend, required when meta backend set to MySQL or PostgreSQL.
75    #[clap(long, hide = true, env = "RW_SQL_PASSWORD", default_value = "")]
76    pub sql_password: Secret<String>,
77
78    /// Database of sql backend, required when meta backend set to MySQL or PostgreSQL.
79    #[clap(long, hide = true, env = "RW_SQL_DATABASE", default_value = "")]
80    pub sql_database: String,
81
82    /// Params for the URL connection, such as `sslmode=disable`.
83    /// Example: `param1=value1&param2=value2`
84    #[clap(long, hide = true, env = "RW_SQL_URL_PARAMS")]
85    pub sql_url_params: Option<String>,
86
87    /// The HTTP REST-API address of the Prometheus instance associated to this cluster.
88    /// This address is used to serve `PromQL` queries to Prometheus.
89    /// It is also used by Grafana Dashboard Service to fetch metrics and visualize them.
90    #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")]
91    pub prometheus_endpoint: Option<String>,
92
93    /// The additional selector used when querying Prometheus.
94    ///
95    /// The format is same as `PromQL`. Example: `instance="foo",namespace="bar"`
96    #[clap(long, env = "RW_PROMETHEUS_SELECTOR")]
97    pub prometheus_selector: Option<String>,
98
99    /// Default tag for the endpoint created when creating a privatelink connection.
100    /// Will be appended to the tags specified in the `tags` field in with clause in `create
101    /// connection`.
102    #[clap(long, hide = true, env = "RW_PRIVATELINK_ENDPOINT_DEFAULT_TAGS")]
103    pub privatelink_endpoint_default_tags: Option<String>,
104
105    #[clap(long, hide = true, env = "RW_VPC_ID")]
106    pub vpc_id: Option<String>,
107
108    #[clap(long, hide = true, env = "RW_VPC_SECURITY_GROUP_ID")]
109    pub security_group_id: Option<String>,
110
111    /// The path of `risingwave.toml` configuration file.
112    ///
113    /// If empty, default configuration values will be used.
114    #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
115    pub config_path: String,
116
117    #[clap(long, hide = true, env = "RW_BACKEND", value_enum)]
118    #[override_opts(path = meta.backend)]
119    pub backend: Option<MetaBackend>,
120
121    /// The interval of periodic barrier.
122    #[clap(long, hide = true, env = "RW_BARRIER_INTERVAL_MS")]
123    #[override_opts(path = system.barrier_interval_ms)]
124    pub barrier_interval_ms: Option<u32>,
125
126    /// Target size of the Sstable.
127    #[clap(long, hide = true, env = "RW_SSTABLE_SIZE_MB")]
128    #[override_opts(path = system.sstable_size_mb)]
129    pub sstable_size_mb: Option<u32>,
130
131    /// Size of each block in bytes in SST.
132    #[clap(long, hide = true, env = "RW_BLOCK_SIZE_KB")]
133    #[override_opts(path = system.block_size_kb)]
134    pub block_size_kb: Option<u32>,
135
136    /// False positive probability of bloom filter.
137    #[clap(long, hide = true, env = "RW_BLOOM_FALSE_POSITIVE")]
138    #[override_opts(path = system.bloom_false_positive)]
139    pub bloom_false_positive: Option<f64>,
140
141    /// State store url
142    #[clap(long, hide = true, env = "RW_STATE_STORE")]
143    #[override_opts(path = system.state_store)]
144    pub state_store: Option<String>,
145
146    /// Remote directory for storing data and metadata objects.
147    #[clap(long, hide = true, env = "RW_DATA_DIRECTORY")]
148    #[override_opts(path = system.data_directory)]
149    pub data_directory: Option<String>,
150
151    /// Whether config object storage bucket lifecycle to purge stale data.
152    #[clap(long, hide = true, env = "RW_DO_NOT_CONFIG_BUCKET_LIFECYCLE")]
153    #[override_opts(path = meta.do_not_config_object_storage_lifecycle)]
154    pub do_not_config_object_storage_lifecycle: Option<bool>,
155
156    /// Remote storage url for storing snapshots.
157    #[clap(long, hide = true, env = "RW_BACKUP_STORAGE_URL")]
158    #[override_opts(path = system.backup_storage_url)]
159    pub backup_storage_url: Option<String>,
160
161    /// Remote directory for storing snapshots.
162    #[clap(long, hide = true, env = "RW_BACKUP_STORAGE_DIRECTORY")]
163    #[override_opts(path = system.backup_storage_directory)]
164    pub backup_storage_directory: Option<String>,
165
166    /// Enable heap profile dump when memory usage is high.
167    #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
168    #[override_opts(path = server.heap_profiling.dir)]
169    pub heap_profiling_dir: Option<String>,
170
171    /// Exit if idle for a certain period of time.
172    #[clap(long, hide = true, env = "RW_DANGEROUS_MAX_IDLE_SECS")]
173    #[override_opts(path = meta.dangerous_max_idle_secs)]
174    pub dangerous_max_idle_secs: Option<u64>,
175
176    /// Endpoint of the connector node.
177    #[deprecated = "connector node has been deprecated."]
178    #[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")]
179    pub connector_rpc_endpoint: Option<String>,
180
181    /// The license key to activate enterprise features.
182    #[clap(long, hide = true, env = "RW_LICENSE_KEY")]
183    #[override_opts(path = system.license_key)]
184    pub license_key: Option<LicenseKey>,
185
186    /// The path of the license key file to be watched and hot-reloaded.
187    #[clap(long, env = "RW_LICENSE_KEY_PATH")]
188    pub license_key_path: Option<PathBuf>,
189
190    /// 128-bit AES key for secret store in HEX format.
191    #[educe(Debug(ignore))] // TODO: use newtype to redact debug impl
192    #[clap(long, hide = true, env = "RW_SECRET_STORE_PRIVATE_KEY_HEX")]
193    pub secret_store_private_key_hex: Option<String>,
194
195    /// The path of the temp secret file directory.
196    #[clap(
197        long,
198        hide = true,
199        env = "RW_TEMP_SECRET_FILE_DIR",
200        default_value = "./secrets"
201    )]
202    pub temp_secret_file_dir: String,
203}
204
205impl risingwave_common::opts::Opts for MetaNodeOpts {
206    fn name() -> &'static str {
207        "meta"
208    }
209
210    fn meta_addr(&self) -> MetaAddressStrategy {
211        format!("http://{}", self.listen_addr)
212            .parse()
213            .expect("invalid listen address")
214    }
215}
216
217use std::future::Future;
218use std::pin::Pin;
219use std::sync::Arc;
220
221use risingwave_common::config::{MetaBackend, RwConfig, load_config};
222use tracing::info;
223
224/// Start meta node
225pub fn start(
226    opts: MetaNodeOpts,
227    shutdown: CancellationToken,
228) -> Pin<Box<dyn Future<Output = ()> + Send>> {
229    // WARNING: don't change the function signature. Making it `async fn` will cause
230    // slow compile in release mode.
231    Box::pin(async move {
232        info!("Starting meta node");
233        info!("> options: {:?}", opts);
234        let config = load_config(&opts.config_path, &opts);
235        info!("> config: {:?}", config);
236        info!("> version: {} ({})", RW_VERSION, GIT_SHA);
237        let listen_addr = opts.listen_addr.parse().unwrap();
238        let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap());
239        let prometheus_addr = opts.prometheus_listener_addr.map(|x| x.parse().unwrap());
240        let meta_store_config = config.meta.meta_store_config.clone();
241        let backend = match config.meta.backend {
242            MetaBackend::Mem => {
243                if opts.sql_endpoint.is_some() {
244                    tracing::warn!("`--sql-endpoint` is ignored when using `mem` backend");
245                }
246                MetaStoreBackend::Mem
247            }
248            MetaBackend::Sql => MetaStoreBackend::Sql {
249                endpoint: opts
250                    .sql_endpoint
251                    .expect("sql endpoint is required")
252                    .expose_secret()
253                    .clone(),
254                config: meta_store_config,
255            },
256            MetaBackend::Sqlite => MetaStoreBackend::Sql {
257                endpoint: format!(
258                    "sqlite://{}?mode=rwc",
259                    opts.sql_endpoint
260                        .expect("sql endpoint is required")
261                        .expose_secret()
262                ),
263                config: meta_store_config,
264            },
265            MetaBackend::Postgres => MetaStoreBackend::Sql {
266                endpoint: format!(
267                    "postgres://{}:{}@{}/{}{}",
268                    opts.sql_username,
269                    opts.sql_password.expose_secret(),
270                    opts.sql_endpoint
271                        .expect("sql endpoint is required")
272                        .expose_secret(),
273                    opts.sql_database,
274                    if let Some(params) = &opts.sql_url_params
275                        && !params.is_empty()
276                    {
277                        format!("?{}", params)
278                    } else {
279                        "".to_owned()
280                    }
281                ),
282                config: meta_store_config,
283            },
284            MetaBackend::Mysql => MetaStoreBackend::Sql {
285                endpoint: format!(
286                    "mysql://{}:{}@{}/{}{}",
287                    opts.sql_username,
288                    opts.sql_password.expose_secret(),
289                    opts.sql_endpoint
290                        .expect("sql endpoint is required")
291                        .expose_secret(),
292                    opts.sql_database,
293                    if let Some(params) = &opts.sql_url_params
294                        && !params.is_empty()
295                    {
296                        format!("?{}", params)
297                    } else {
298                        "".to_owned()
299                    }
300                ),
301                config: meta_store_config,
302            },
303        };
304        validate_config(&config);
305
306        let total_memory_bytes = resource_util::memory::system_memory_available_bytes();
307        let heap_profiler =
308            HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
309        // Run a background heap profiler
310        heap_profiler.start();
311
312        let secret_store_private_key = opts
313            .secret_store_private_key_hex
314            .map(|key| hex::decode(key).unwrap());
315        let max_heartbeat_interval =
316            Duration::from_secs(config.meta.max_heartbeat_interval_secs as u64);
317        let max_idle_ms = config.meta.dangerous_max_idle_secs.unwrap_or(0) * 1000;
318        let in_flight_barrier_nums = config.streaming.in_flight_barrier_nums;
319        let privatelink_endpoint_default_tags =
320            opts.privatelink_endpoint_default_tags.map(|tags| {
321                tags.split(',')
322                    .map(|s| {
323                        let key_val = s.split_once('=').unwrap();
324                        (key_val.0.to_owned(), key_val.1.to_owned())
325                    })
326                    .collect()
327            });
328
329        let add_info = AddressInfo {
330            advertise_addr: opts.advertise_addr.to_owned(),
331            listen_addr,
332            prometheus_addr,
333            dashboard_addr,
334        };
335
336        const MIN_TIMEOUT_INTERVAL_SEC: u64 = 20;
337        let compaction_task_max_progress_interval_secs = {
338            let retry_config = &config.storage.object_store.retry;
339            let max_streming_read_timeout_ms = (retry_config.streaming_read_attempt_timeout_ms
340                + retry_config.req_backoff_max_delay_ms)
341                * retry_config.streaming_read_retry_attempts as u64;
342            let max_streaming_upload_timeout_ms = (retry_config
343                .streaming_upload_attempt_timeout_ms
344                + retry_config.req_backoff_max_delay_ms)
345                * retry_config.streaming_upload_retry_attempts as u64;
346            let max_upload_timeout_ms = (retry_config.upload_attempt_timeout_ms
347                + retry_config.req_backoff_max_delay_ms)
348                * retry_config.upload_retry_attempts as u64;
349            let max_read_timeout_ms = (retry_config.read_attempt_timeout_ms
350                + retry_config.req_backoff_max_delay_ms)
351                * retry_config.read_retry_attempts as u64;
352            let max_timeout_ms = max_streming_read_timeout_ms
353                .max(max_upload_timeout_ms)
354                .max(max_streaming_upload_timeout_ms)
355                .max(max_read_timeout_ms)
356                .max(config.meta.compaction_task_max_progress_interval_secs * 1000);
357            max_timeout_ms / 1000
358        } + MIN_TIMEOUT_INTERVAL_SEC;
359
360        rpc_serve(
361            add_info,
362            backend,
363            max_heartbeat_interval,
364            config.meta.meta_leader_lease_secs,
365            MetaOpts {
366                enable_recovery: !config.meta.disable_recovery,
367                disable_automatic_parallelism_control: config
368                    .meta
369                    .disable_automatic_parallelism_control,
370                parallelism_control_batch_size: config.meta.parallelism_control_batch_size,
371                parallelism_control_trigger_period_sec: config
372                    .meta
373                    .parallelism_control_trigger_period_sec,
374                parallelism_control_trigger_first_delay_sec: config
375                    .meta
376                    .parallelism_control_trigger_first_delay_sec,
377                in_flight_barrier_nums,
378                max_idle_ms,
379                compaction_deterministic_test: config.meta.enable_compaction_deterministic,
380                default_parallelism: config.meta.default_parallelism,
381                vacuum_interval_sec: config.meta.vacuum_interval_sec,
382                time_travel_vacuum_interval_sec: config
383                    .meta
384                    .developer
385                    .time_travel_vacuum_interval_sec,
386                vacuum_spin_interval_ms: config.meta.vacuum_spin_interval_ms,
387                hummock_version_checkpoint_interval_sec: config
388                    .meta
389                    .hummock_version_checkpoint_interval_sec,
390                enable_hummock_data_archive: config.meta.enable_hummock_data_archive,
391                hummock_time_travel_snapshot_interval: config
392                    .meta
393                    .hummock_time_travel_snapshot_interval,
394                hummock_time_travel_sst_info_fetch_batch_size: config
395                    .meta
396                    .developer
397                    .hummock_time_travel_sst_info_fetch_batch_size,
398                hummock_time_travel_sst_info_insert_batch_size: config
399                    .meta
400                    .developer
401                    .hummock_time_travel_sst_info_insert_batch_size,
402                hummock_time_travel_epoch_version_insert_batch_size: config
403                    .meta
404                    .developer
405                    .hummock_time_travel_epoch_version_insert_batch_size,
406                hummock_gc_history_insert_batch_size: config
407                    .meta
408                    .developer
409                    .hummock_gc_history_insert_batch_size,
410                hummock_time_travel_filter_out_objects_batch_size: config
411                    .meta
412                    .developer
413                    .hummock_time_travel_filter_out_objects_batch_size,
414                hummock_time_travel_filter_out_objects_v1: config
415                    .meta
416                    .developer
417                    .hummock_time_travel_filter_out_objects_v1,
418                hummock_time_travel_filter_out_objects_list_version_batch_size: config
419                    .meta
420                    .developer
421                    .hummock_time_travel_filter_out_objects_list_version_batch_size,
422                hummock_time_travel_filter_out_objects_list_delta_batch_size: config
423                    .meta
424                    .developer
425                    .hummock_time_travel_filter_out_objects_list_delta_batch_size,
426                min_delta_log_num_for_hummock_version_checkpoint: config
427                    .meta
428                    .min_delta_log_num_for_hummock_version_checkpoint,
429                min_sst_retention_time_sec: config.meta.min_sst_retention_time_sec,
430                full_gc_interval_sec: config.meta.full_gc_interval_sec,
431                full_gc_object_limit: config.meta.full_gc_object_limit,
432                gc_history_retention_time_sec: config.meta.gc_history_retention_time_sec,
433                max_inflight_time_travel_query: config.meta.max_inflight_time_travel_query,
434                enable_committed_sst_sanity_check: config.meta.enable_committed_sst_sanity_check,
435                periodic_compaction_interval_sec: config.meta.periodic_compaction_interval_sec,
436                node_num_monitor_interval_sec: config.meta.node_num_monitor_interval_sec,
437                protect_drop_table_with_incoming_sink: config
438                    .meta
439                    .protect_drop_table_with_incoming_sink,
440                prometheus_endpoint: opts.prometheus_endpoint,
441                prometheus_selector: opts.prometheus_selector,
442                vpc_id: opts.vpc_id,
443                security_group_id: opts.security_group_id,
444                privatelink_endpoint_default_tags,
445                periodic_space_reclaim_compaction_interval_sec: config
446                    .meta
447                    .periodic_space_reclaim_compaction_interval_sec,
448                telemetry_enabled: config.server.telemetry_enabled,
449                periodic_ttl_reclaim_compaction_interval_sec: config
450                    .meta
451                    .periodic_ttl_reclaim_compaction_interval_sec,
452                periodic_tombstone_reclaim_compaction_interval_sec: config
453                    .meta
454                    .periodic_tombstone_reclaim_compaction_interval_sec,
455                periodic_scheduling_compaction_group_split_interval_sec: config
456                    .meta
457                    .periodic_scheduling_compaction_group_split_interval_sec,
458                periodic_scheduling_compaction_group_merge_interval_sec: config
459                    .meta
460                    .periodic_scheduling_compaction_group_merge_interval_sec,
461                compaction_group_merge_dimension_threshold: config
462                    .meta
463                    .compaction_group_merge_dimension_threshold,
464                table_high_write_throughput_threshold: config
465                    .meta
466                    .table_high_write_throughput_threshold,
467                table_low_write_throughput_threshold: config
468                    .meta
469                    .table_low_write_throughput_threshold,
470                partition_vnode_count: config.meta.partition_vnode_count,
471                compact_task_table_size_partition_threshold_low: config
472                    .meta
473                    .compact_task_table_size_partition_threshold_low,
474                compact_task_table_size_partition_threshold_high: config
475                    .meta
476                    .compact_task_table_size_partition_threshold_high,
477                do_not_config_object_storage_lifecycle: config
478                    .meta
479                    .do_not_config_object_storage_lifecycle,
480                compaction_task_max_heartbeat_interval_secs: config
481                    .meta
482                    .compaction_task_max_heartbeat_interval_secs,
483                compaction_task_max_progress_interval_secs,
484                compaction_config: Some(config.meta.compaction_config),
485                hybrid_partition_node_count: config.meta.hybrid_partition_vnode_count,
486                event_log_enabled: config.meta.event_log_enabled,
487                event_log_channel_max_size: config.meta.event_log_channel_max_size,
488                advertise_addr: opts.advertise_addr,
489                cached_traces_num: config.meta.developer.cached_traces_num,
490                cached_traces_memory_limit_bytes: config
491                    .meta
492                    .developer
493                    .cached_traces_memory_limit_bytes,
494                enable_trivial_move: config.meta.developer.enable_trivial_move,
495                enable_check_task_level_overlap: config
496                    .meta
497                    .developer
498                    .enable_check_task_level_overlap,
499                enable_dropped_column_reclaim: config.meta.enable_dropped_column_reclaim,
500                split_group_size_ratio: config.meta.split_group_size_ratio,
501                table_stat_high_write_throughput_ratio_for_split: config
502                    .meta
503                    .table_stat_high_write_throughput_ratio_for_split,
504                table_stat_low_write_throughput_ratio_for_merge: config
505                    .meta
506                    .table_stat_low_write_throughput_ratio_for_merge,
507                table_stat_throuput_window_seconds_for_split: config
508                    .meta
509                    .table_stat_throuput_window_seconds_for_split,
510                table_stat_throuput_window_seconds_for_merge: config
511                    .meta
512                    .table_stat_throuput_window_seconds_for_merge,
513                object_store_config: config.storage.object_store,
514                max_trivial_move_task_count_per_loop: config
515                    .meta
516                    .developer
517                    .max_trivial_move_task_count_per_loop,
518                max_get_task_probe_times: config.meta.developer.max_get_task_probe_times,
519                secret_store_private_key,
520                temp_secret_file_dir: opts.temp_secret_file_dir,
521                actor_cnt_per_worker_parallelism_hard_limit: config
522                    .meta
523                    .developer
524                    .actor_cnt_per_worker_parallelism_hard_limit,
525                actor_cnt_per_worker_parallelism_soft_limit: config
526                    .meta
527                    .developer
528                    .actor_cnt_per_worker_parallelism_soft_limit,
529                license_key_path: opts.license_key_path,
530                compute_client_config: config.meta.developer.compute_client_config.clone(),
531                stream_client_config: config.meta.developer.stream_client_config.clone(),
532                frontend_client_config: config.meta.developer.frontend_client_config.clone(),
533                redact_sql_option_keywords: Arc::new(
534                    config
535                        .batch
536                        .redact_sql_option_keywords
537                        .into_iter()
538                        .collect(),
539                ),
540                cdc_table_split_init_sleep_interval_splits: config
541                    .meta
542                    .cdc_table_split_init_sleep_interval_splits,
543                cdc_table_split_init_sleep_duration_millis: config
544                    .meta
545                    .cdc_table_split_init_sleep_duration_millis,
546                cdc_table_split_init_insert_batch_size: config
547                    .meta
548                    .cdc_table_split_init_insert_batch_size,
549            },
550            config.system.into_init_system_params(),
551            Default::default(),
552            shutdown,
553        )
554        .await
555        .unwrap();
556    })
557}
558
559fn validate_config(config: &RwConfig) {
560    if config.meta.meta_leader_lease_secs <= 2 {
561        let error_msg = "meta leader lease secs should be larger than 2";
562        tracing::error!(error_msg);
563        panic!("{}", error_msg);
564    }
565
566    if config.meta.parallelism_control_batch_size == 0 {
567        let error_msg = "parallelism control batch size should be larger than 0";
568        tracing::error!(error_msg);
569        panic!("{}", error_msg);
570    }
571}