1#![feature(coverage_attribute)]
16
17mod server;
18
19use std::path::PathBuf;
20use std::time::Duration;
21
22use clap::Parser;
23use educe::Educe;
24pub use error::{MetaError, MetaResult};
25use redact::Secret;
26use risingwave_common::config::OverrideConfig;
27use risingwave_common::license::LicenseKey;
28use risingwave_common::util::meta_addr::MetaAddressStrategy;
29use risingwave_common::util::resource_util;
30use risingwave_common::util::tokio_util::sync::CancellationToken;
31use risingwave_common::{GIT_SHA, RW_VERSION};
32use risingwave_common_heap_profiling::HeapProfiler;
33use risingwave_meta::*;
34use risingwave_meta_service::*;
35pub use rpc::{ElectionClient, ElectionMember};
36use server::rpc_serve;
37pub use server::started::get as is_server_started;
38
39use crate::manager::MetaOpts;
40
41#[derive(Educe, Clone, Parser, OverrideConfig)]
42#[educe(Debug)]
43#[command(version, about = "The central metadata management service")]
44pub struct MetaNodeOpts {
45 #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5690")]
47 pub listen_addr: String,
48
49 #[clap(long, env = "RW_ADVERTISE_ADDR", default_value = "127.0.0.1:5690")]
55 pub advertise_addr: String,
56
57 #[clap(long, env = "RW_DASHBOARD_HOST")]
58 pub dashboard_host: Option<String>,
59
60 #[clap(long, env = "RW_PROMETHEUS_HOST", alias = "prometheus-host")]
63 pub prometheus_listener_addr: Option<String>,
64
65 #[clap(long, hide = true, env = "RW_SQL_ENDPOINT")]
67 pub sql_endpoint: Option<Secret<String>>,
68
69 #[clap(long, hide = true, env = "RW_SQL_USERNAME", default_value = "")]
71 pub sql_username: String,
72
73 #[clap(long, hide = true, env = "RW_SQL_PASSWORD", default_value = "")]
75 pub sql_password: Secret<String>,
76
77 #[clap(long, hide = true, env = "RW_SQL_DATABASE", default_value = "")]
79 pub sql_database: String,
80
81 #[clap(long, hide = true, env = "RW_SQL_URL_PARAMS")]
84 pub sql_url_params: Option<String>,
85
86 #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")]
90 pub prometheus_endpoint: Option<String>,
91
92 #[clap(long, env = "RW_PROMETHEUS_SELECTOR")]
96 pub prometheus_selector: Option<String>,
97
98 #[clap(long, hide = true, env = "RW_PRIVATELINK_ENDPOINT_DEFAULT_TAGS")]
102 pub privatelink_endpoint_default_tags: Option<String>,
103
104 #[clap(long, hide = true, env = "RW_VPC_ID")]
105 pub vpc_id: Option<String>,
106
107 #[clap(long, hide = true, env = "RW_VPC_SECURITY_GROUP_ID")]
108 pub security_group_id: Option<String>,
109
110 #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
114 pub config_path: String,
115
116 #[clap(long, hide = true, env = "RW_BACKEND", value_enum)]
117 #[override_opts(path = meta.backend)]
118 pub backend: Option<MetaBackend>,
119
120 #[clap(long, hide = true, env = "RW_BARRIER_INTERVAL_MS")]
122 #[override_opts(path = system.barrier_interval_ms)]
123 pub barrier_interval_ms: Option<u32>,
124
125 #[clap(long, hide = true, env = "RW_SSTABLE_SIZE_MB")]
127 #[override_opts(path = system.sstable_size_mb)]
128 pub sstable_size_mb: Option<u32>,
129
130 #[clap(long, hide = true, env = "RW_BLOCK_SIZE_KB")]
132 #[override_opts(path = system.block_size_kb)]
133 pub block_size_kb: Option<u32>,
134
135 #[clap(long, hide = true, env = "RW_BLOOM_FALSE_POSITIVE")]
137 #[override_opts(path = system.bloom_false_positive)]
138 pub bloom_false_positive: Option<f64>,
139
140 #[clap(long, hide = true, env = "RW_STATE_STORE")]
142 #[override_opts(path = system.state_store)]
143 pub state_store: Option<String>,
144
145 #[clap(long, hide = true, env = "RW_DATA_DIRECTORY")]
147 #[override_opts(path = system.data_directory)]
148 pub data_directory: Option<String>,
149
150 #[clap(long, hide = true, env = "RW_DO_NOT_CONFIG_BUCKET_LIFECYCLE")]
152 #[override_opts(path = meta.do_not_config_object_storage_lifecycle)]
153 pub do_not_config_object_storage_lifecycle: Option<bool>,
154
155 #[clap(long, hide = true, env = "RW_BACKUP_STORAGE_URL")]
157 #[override_opts(path = system.backup_storage_url)]
158 pub backup_storage_url: Option<String>,
159
160 #[clap(long, hide = true, env = "RW_BACKUP_STORAGE_DIRECTORY")]
162 #[override_opts(path = system.backup_storage_directory)]
163 pub backup_storage_directory: Option<String>,
164
165 #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
167 #[override_opts(path = server.heap_profiling.dir)]
168 pub heap_profiling_dir: Option<String>,
169
170 #[clap(long, hide = true, env = "RW_DANGEROUS_MAX_IDLE_SECS")]
172 #[override_opts(path = meta.dangerous_max_idle_secs)]
173 pub dangerous_max_idle_secs: Option<u64>,
174
175 #[deprecated = "connector node has been deprecated."]
177 #[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")]
178 pub connector_rpc_endpoint: Option<String>,
179
180 #[clap(long, hide = true, env = "RW_LICENSE_KEY")]
182 #[override_opts(path = system.license_key)]
183 pub license_key: Option<LicenseKey>,
184
185 #[clap(long, env = "RW_LICENSE_KEY_PATH")]
187 pub license_key_path: Option<PathBuf>,
188
189 #[educe(Debug(ignore))] #[clap(long, hide = true, env = "RW_SECRET_STORE_PRIVATE_KEY_HEX")]
192 pub secret_store_private_key_hex: Option<String>,
193
194 #[clap(
196 long,
197 hide = true,
198 env = "RW_TEMP_SECRET_FILE_DIR",
199 default_value = "./secrets"
200 )]
201 pub temp_secret_file_dir: String,
202}
203
204impl risingwave_common::opts::Opts for MetaNodeOpts {
205 fn name() -> &'static str {
206 "meta"
207 }
208
209 fn meta_addr(&self) -> MetaAddressStrategy {
210 format!("http://{}", self.listen_addr)
211 .parse()
212 .expect("invalid listen address")
213 }
214}
215
216use std::future::Future;
217use std::pin::Pin;
218use std::sync::Arc;
219
220use risingwave_common::config::{MetaBackend, RwConfig, load_config};
221use tracing::info;
222
223pub fn start(
225 opts: MetaNodeOpts,
226 shutdown: CancellationToken,
227) -> Pin<Box<dyn Future<Output = ()> + Send>> {
228 Box::pin(async move {
231 info!("Starting meta node");
232 info!("> options: {:?}", opts);
233 let config = load_config(&opts.config_path, &opts);
234 info!("> config: {:?}", config);
235 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
236 let listen_addr = opts.listen_addr.parse().unwrap();
237 let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap());
238 let prometheus_addr = opts.prometheus_listener_addr.map(|x| x.parse().unwrap());
239 let meta_store_config = config.meta.meta_store_config.clone();
240 let backend = match config.meta.backend {
241 MetaBackend::Mem => {
242 if opts.sql_endpoint.is_some() {
243 tracing::warn!("`--sql-endpoint` is ignored when using `mem` backend");
244 }
245 MetaStoreBackend::Mem
246 }
247 MetaBackend::Sql => MetaStoreBackend::Sql {
248 endpoint: opts
249 .sql_endpoint
250 .expect("sql endpoint is required")
251 .expose_secret()
252 .clone(),
253 config: meta_store_config,
254 },
255 MetaBackend::Sqlite => MetaStoreBackend::Sql {
256 endpoint: format!(
257 "sqlite://{}?mode=rwc",
258 opts.sql_endpoint
259 .expect("sql endpoint is required")
260 .expose_secret()
261 ),
262 config: meta_store_config,
263 },
264 MetaBackend::Postgres => MetaStoreBackend::Sql {
265 endpoint: format!(
266 "postgres://{}:{}@{}/{}{}",
267 opts.sql_username,
268 opts.sql_password.expose_secret(),
269 opts.sql_endpoint
270 .expect("sql endpoint is required")
271 .expose_secret(),
272 opts.sql_database,
273 if let Some(params) = &opts.sql_url_params
274 && !params.is_empty()
275 {
276 format!("?{}", params)
277 } else {
278 "".to_owned()
279 }
280 ),
281 config: meta_store_config,
282 },
283 MetaBackend::Mysql => MetaStoreBackend::Sql {
284 endpoint: format!(
285 "mysql://{}:{}@{}/{}{}",
286 opts.sql_username,
287 opts.sql_password.expose_secret(),
288 opts.sql_endpoint
289 .expect("sql endpoint is required")
290 .expose_secret(),
291 opts.sql_database,
292 if let Some(params) = &opts.sql_url_params
293 && !params.is_empty()
294 {
295 format!("?{}", params)
296 } else {
297 "".to_owned()
298 }
299 ),
300 config: meta_store_config,
301 },
302 };
303 validate_config(&config);
304
305 let total_memory_bytes = resource_util::memory::system_memory_available_bytes();
306 let heap_profiler =
307 HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
308 heap_profiler.start();
310
311 let secret_store_private_key = opts
312 .secret_store_private_key_hex
313 .map(|key| hex::decode(key).unwrap());
314 let max_heartbeat_interval =
315 Duration::from_secs(config.meta.max_heartbeat_interval_secs as u64);
316 let max_idle_ms = config.meta.dangerous_max_idle_secs.unwrap_or(0) * 1000;
317 let in_flight_barrier_nums = config.streaming.in_flight_barrier_nums;
318 let privatelink_endpoint_default_tags =
319 opts.privatelink_endpoint_default_tags.map(|tags| {
320 tags.split(',')
321 .map(|s| {
322 let key_val = s.split_once('=').unwrap();
323 (key_val.0.to_owned(), key_val.1.to_owned())
324 })
325 .collect()
326 });
327
328 let add_info = AddressInfo {
329 advertise_addr: opts.advertise_addr.clone(),
330 listen_addr,
331 prometheus_addr,
332 dashboard_addr,
333 };
334
335 const MIN_TIMEOUT_INTERVAL_SEC: u64 = 20;
336 let compaction_task_max_progress_interval_secs = {
337 let retry_config = &config.storage.object_store.retry;
338 let max_streaming_read_timeout_ms = (retry_config.streaming_read_attempt_timeout_ms
339 + retry_config.req_backoff_max_delay_ms)
340 * retry_config.streaming_read_retry_attempts as u64;
341 let max_streaming_upload_timeout_ms = (retry_config
342 .streaming_upload_attempt_timeout_ms
343 + retry_config.req_backoff_max_delay_ms)
344 * retry_config.streaming_upload_retry_attempts as u64;
345 let max_upload_timeout_ms = (retry_config.upload_attempt_timeout_ms
346 + retry_config.req_backoff_max_delay_ms)
347 * retry_config.upload_retry_attempts as u64;
348 let max_read_timeout_ms = (retry_config.read_attempt_timeout_ms
349 + retry_config.req_backoff_max_delay_ms)
350 * retry_config.read_retry_attempts as u64;
351 let max_timeout_ms = max_streaming_read_timeout_ms
352 .max(max_upload_timeout_ms)
353 .max(max_streaming_upload_timeout_ms)
354 .max(max_read_timeout_ms)
355 .max(config.meta.compaction_task_max_progress_interval_secs * 1000);
356 max_timeout_ms / 1000
357 } + MIN_TIMEOUT_INTERVAL_SEC;
358
359 rpc_serve(
360 add_info,
361 backend,
362 max_heartbeat_interval,
363 config.meta.meta_leader_lease_secs,
364 config.server.clone(),
365 MetaOpts {
366 enable_recovery: !config.meta.disable_recovery,
367 disable_automatic_parallelism_control: config
368 .meta
369 .disable_automatic_parallelism_control,
370 parallelism_control_batch_size: config.meta.parallelism_control_batch_size,
371 parallelism_control_trigger_period_sec: config
372 .meta
373 .parallelism_control_trigger_period_sec,
374 parallelism_control_trigger_first_delay_sec: config
375 .meta
376 .parallelism_control_trigger_first_delay_sec,
377 in_flight_barrier_nums,
378 max_idle_ms,
379 compaction_deterministic_test: config.meta.enable_compaction_deterministic,
380 default_parallelism: config.meta.default_parallelism,
381 vacuum_interval_sec: config.meta.vacuum_interval_sec,
382 time_travel_vacuum_interval_sec: config
383 .meta
384 .developer
385 .time_travel_vacuum_interval_sec,
386 vacuum_spin_interval_ms: config.meta.vacuum_spin_interval_ms,
387 iceberg_gc_interval_sec: config.meta.iceberg_gc_interval_sec,
388 hummock_version_checkpoint_interval_sec: config
389 .meta
390 .hummock_version_checkpoint_interval_sec,
391 enable_hummock_data_archive: config.meta.enable_hummock_data_archive,
392 hummock_time_travel_snapshot_interval: config
393 .meta
394 .hummock_time_travel_snapshot_interval,
395 hummock_time_travel_sst_info_fetch_batch_size: config
396 .meta
397 .developer
398 .hummock_time_travel_sst_info_fetch_batch_size,
399 hummock_time_travel_sst_info_insert_batch_size: config
400 .meta
401 .developer
402 .hummock_time_travel_sst_info_insert_batch_size,
403 hummock_time_travel_epoch_version_insert_batch_size: config
404 .meta
405 .developer
406 .hummock_time_travel_epoch_version_insert_batch_size,
407 hummock_gc_history_insert_batch_size: config
408 .meta
409 .developer
410 .hummock_gc_history_insert_batch_size,
411 hummock_time_travel_filter_out_objects_batch_size: config
412 .meta
413 .developer
414 .hummock_time_travel_filter_out_objects_batch_size,
415 hummock_time_travel_filter_out_objects_v1: config
416 .meta
417 .developer
418 .hummock_time_travel_filter_out_objects_v1,
419 hummock_time_travel_filter_out_objects_list_version_batch_size: config
420 .meta
421 .developer
422 .hummock_time_travel_filter_out_objects_list_version_batch_size,
423 hummock_time_travel_filter_out_objects_list_delta_batch_size: config
424 .meta
425 .developer
426 .hummock_time_travel_filter_out_objects_list_delta_batch_size,
427 min_delta_log_num_for_hummock_version_checkpoint: config
428 .meta
429 .min_delta_log_num_for_hummock_version_checkpoint,
430 min_sst_retention_time_sec: config.meta.min_sst_retention_time_sec,
431 full_gc_interval_sec: config.meta.full_gc_interval_sec,
432 full_gc_object_limit: config.meta.full_gc_object_limit,
433 gc_history_retention_time_sec: config.meta.gc_history_retention_time_sec,
434 max_inflight_time_travel_query: config.meta.max_inflight_time_travel_query,
435 enable_committed_sst_sanity_check: config.meta.enable_committed_sst_sanity_check,
436 periodic_compaction_interval_sec: config.meta.periodic_compaction_interval_sec,
437 node_num_monitor_interval_sec: config.meta.node_num_monitor_interval_sec,
438 protect_drop_table_with_incoming_sink: config
439 .meta
440 .protect_drop_table_with_incoming_sink,
441 prometheus_endpoint: opts.prometheus_endpoint,
442 prometheus_selector: opts.prometheus_selector,
443 vpc_id: opts.vpc_id,
444 security_group_id: opts.security_group_id,
445 privatelink_endpoint_default_tags,
446 periodic_space_reclaim_compaction_interval_sec: config
447 .meta
448 .periodic_space_reclaim_compaction_interval_sec,
449 telemetry_enabled: config.server.telemetry_enabled,
450 periodic_ttl_reclaim_compaction_interval_sec: config
451 .meta
452 .periodic_ttl_reclaim_compaction_interval_sec,
453 periodic_tombstone_reclaim_compaction_interval_sec: config
454 .meta
455 .periodic_tombstone_reclaim_compaction_interval_sec,
456 periodic_scheduling_compaction_group_split_interval_sec: config
457 .meta
458 .periodic_scheduling_compaction_group_split_interval_sec,
459 periodic_scheduling_compaction_group_merge_interval_sec: config
460 .meta
461 .periodic_scheduling_compaction_group_merge_interval_sec,
462 compaction_group_merge_dimension_threshold: config
463 .meta
464 .compaction_group_merge_dimension_threshold,
465 table_high_write_throughput_threshold: config
466 .meta
467 .table_high_write_throughput_threshold,
468 table_low_write_throughput_threshold: config
469 .meta
470 .table_low_write_throughput_threshold,
471 partition_vnode_count: config.meta.partition_vnode_count,
472 compact_task_table_size_partition_threshold_low: config
473 .meta
474 .compact_task_table_size_partition_threshold_low,
475 compact_task_table_size_partition_threshold_high: config
476 .meta
477 .compact_task_table_size_partition_threshold_high,
478 do_not_config_object_storage_lifecycle: config
479 .meta
480 .do_not_config_object_storage_lifecycle,
481 compaction_task_max_heartbeat_interval_secs: config
482 .meta
483 .compaction_task_max_heartbeat_interval_secs,
484 compaction_task_max_progress_interval_secs,
485 compaction_config: Some(config.meta.compaction_config),
486 hybrid_partition_node_count: config.meta.hybrid_partition_vnode_count,
487 event_log_enabled: config.meta.event_log_enabled,
488 event_log_channel_max_size: config.meta.event_log_channel_max_size,
489 advertise_addr: opts.advertise_addr,
490 cached_traces_num: config.meta.developer.cached_traces_num,
491 cached_traces_memory_limit_bytes: config
492 .meta
493 .developer
494 .cached_traces_memory_limit_bytes,
495 enable_trivial_move: config.meta.developer.enable_trivial_move,
496 enable_check_task_level_overlap: config
497 .meta
498 .developer
499 .enable_check_task_level_overlap,
500 enable_dropped_column_reclaim: config.meta.enable_dropped_column_reclaim,
501 split_group_size_ratio: config.meta.split_group_size_ratio,
502 refresh_scheduler_interval_sec: config
503 .streaming
504 .developer
505 .refresh_scheduler_interval_sec,
506 table_stat_high_write_throughput_ratio_for_split: config
507 .meta
508 .table_stat_high_write_throughput_ratio_for_split,
509 table_stat_low_write_throughput_ratio_for_merge: config
510 .meta
511 .table_stat_low_write_throughput_ratio_for_merge,
512 table_stat_throuput_window_seconds_for_split: config
513 .meta
514 .table_stat_throuput_window_seconds_for_split,
515 table_stat_throuput_window_seconds_for_merge: config
516 .meta
517 .table_stat_throuput_window_seconds_for_merge,
518 object_store_config: config.storage.object_store,
519 max_trivial_move_task_count_per_loop: config
520 .meta
521 .developer
522 .max_trivial_move_task_count_per_loop,
523 max_get_task_probe_times: config.meta.developer.max_get_task_probe_times,
524 secret_store_private_key,
525 temp_secret_file_dir: opts.temp_secret_file_dir,
526 actor_cnt_per_worker_parallelism_hard_limit: config
527 .meta
528 .developer
529 .actor_cnt_per_worker_parallelism_hard_limit,
530 actor_cnt_per_worker_parallelism_soft_limit: config
531 .meta
532 .developer
533 .actor_cnt_per_worker_parallelism_soft_limit,
534 license_key_path: opts.license_key_path,
535 compute_client_config: config.meta.developer.compute_client_config.clone(),
536 stream_client_config: config.meta.developer.stream_client_config.clone(),
537 frontend_client_config: config.meta.developer.frontend_client_config.clone(),
538 redact_sql_option_keywords: Arc::new(
539 config
540 .batch
541 .redact_sql_option_keywords
542 .into_iter()
543 .collect(),
544 ),
545 cdc_table_split_init_sleep_interval_splits: config
546 .meta
547 .cdc_table_split_init_sleep_interval_splits,
548 cdc_table_split_init_sleep_duration_millis: config
549 .meta
550 .cdc_table_split_init_sleep_duration_millis,
551 cdc_table_split_init_insert_batch_size: config
552 .meta
553 .cdc_table_split_init_insert_batch_size,
554
555 enable_legacy_table_migration: config.meta.enable_legacy_table_migration,
556 pause_on_next_bootstrap_offline: config.meta.pause_on_next_bootstrap_offline,
557 },
558 config.system.into_init_system_params(),
559 Default::default(),
560 shutdown,
561 )
562 .await
563 .unwrap();
564 })
565}
566
567fn validate_config(config: &RwConfig) {
568 if config.meta.meta_leader_lease_secs <= 2 {
569 let error_msg = "meta leader lease secs should be larger than 2";
570 tracing::error!(error_msg);
571 panic!("{}", error_msg);
572 }
573
574 if config.meta.parallelism_control_batch_size == 0 {
575 let error_msg = "parallelism control batch size should be larger than 0";
576 tracing::error!(error_msg);
577 panic!("{}", error_msg);
578 }
579}