1#![feature(let_chains)]
16#![feature(coverage_attribute)]
17
18mod server;
19
20use std::path::PathBuf;
21use std::time::Duration;
22
23use clap::Parser;
24use educe::Educe;
25pub use error::{MetaError, MetaResult};
26use redact::Secret;
27use risingwave_common::config::OverrideConfig;
28use risingwave_common::license::LicenseKey;
29use risingwave_common::util::meta_addr::MetaAddressStrategy;
30use risingwave_common::util::resource_util;
31use risingwave_common::util::tokio_util::sync::CancellationToken;
32use risingwave_common::{GIT_SHA, RW_VERSION};
33use risingwave_common_heap_profiling::HeapProfiler;
34use risingwave_meta::*;
35use risingwave_meta_service::*;
36pub use rpc::{ElectionClient, ElectionMember};
37use server::rpc_serve;
38pub use server::started::get as is_server_started;
39
40use crate::manager::MetaOpts;
41
42#[derive(Educe, Clone, Parser, OverrideConfig)]
43#[educe(Debug)]
44#[command(version, about = "The central metadata management service")]
45pub struct MetaNodeOpts {
46 #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5690")]
48 pub listen_addr: String,
49
50 #[clap(long, env = "RW_ADVERTISE_ADDR", default_value = "127.0.0.1:5690")]
56 pub advertise_addr: String,
57
58 #[clap(long, env = "RW_DASHBOARD_HOST")]
59 pub dashboard_host: Option<String>,
60
61 #[clap(long, env = "RW_PROMETHEUS_HOST", alias = "prometheus-host")]
64 pub prometheus_listener_addr: Option<String>,
65
66 #[clap(long, hide = true, env = "RW_SQL_ENDPOINT")]
68 pub sql_endpoint: Option<Secret<String>>,
69
70 #[clap(long, hide = true, env = "RW_SQL_USERNAME", default_value = "")]
72 pub sql_username: String,
73
74 #[clap(long, hide = true, env = "RW_SQL_PASSWORD", default_value = "")]
76 pub sql_password: Secret<String>,
77
78 #[clap(long, hide = true, env = "RW_SQL_DATABASE", default_value = "")]
80 pub sql_database: String,
81
82 #[clap(long, hide = true, env = "RW_SQL_URL_PARAMS")]
85 pub sql_url_params: Option<String>,
86
87 #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")]
91 pub prometheus_endpoint: Option<String>,
92
93 #[clap(long, env = "RW_PROMETHEUS_SELECTOR")]
97 pub prometheus_selector: Option<String>,
98
99 #[clap(long, hide = true, env = "RW_PRIVATELINK_ENDPOINT_DEFAULT_TAGS")]
103 pub privatelink_endpoint_default_tags: Option<String>,
104
105 #[clap(long, hide = true, env = "RW_VPC_ID")]
106 pub vpc_id: Option<String>,
107
108 #[clap(long, hide = true, env = "RW_VPC_SECURITY_GROUP_ID")]
109 pub security_group_id: Option<String>,
110
111 #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
115 pub config_path: String,
116
117 #[clap(long, hide = true, env = "RW_BACKEND", value_enum)]
118 #[override_opts(path = meta.backend)]
119 pub backend: Option<MetaBackend>,
120
121 #[clap(long, hide = true, env = "RW_BARRIER_INTERVAL_MS")]
123 #[override_opts(path = system.barrier_interval_ms)]
124 pub barrier_interval_ms: Option<u32>,
125
126 #[clap(long, hide = true, env = "RW_SSTABLE_SIZE_MB")]
128 #[override_opts(path = system.sstable_size_mb)]
129 pub sstable_size_mb: Option<u32>,
130
131 #[clap(long, hide = true, env = "RW_BLOCK_SIZE_KB")]
133 #[override_opts(path = system.block_size_kb)]
134 pub block_size_kb: Option<u32>,
135
136 #[clap(long, hide = true, env = "RW_BLOOM_FALSE_POSITIVE")]
138 #[override_opts(path = system.bloom_false_positive)]
139 pub bloom_false_positive: Option<f64>,
140
141 #[clap(long, hide = true, env = "RW_STATE_STORE")]
143 #[override_opts(path = system.state_store)]
144 pub state_store: Option<String>,
145
146 #[clap(long, hide = true, env = "RW_DATA_DIRECTORY")]
148 #[override_opts(path = system.data_directory)]
149 pub data_directory: Option<String>,
150
151 #[clap(long, hide = true, env = "RW_DO_NOT_CONFIG_BUCKET_LIFECYCLE")]
153 #[override_opts(path = meta.do_not_config_object_storage_lifecycle)]
154 pub do_not_config_object_storage_lifecycle: Option<bool>,
155
156 #[clap(long, hide = true, env = "RW_BACKUP_STORAGE_URL")]
158 #[override_opts(path = system.backup_storage_url)]
159 pub backup_storage_url: Option<String>,
160
161 #[clap(long, hide = true, env = "RW_BACKUP_STORAGE_DIRECTORY")]
163 #[override_opts(path = system.backup_storage_directory)]
164 pub backup_storage_directory: Option<String>,
165
166 #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
168 #[override_opts(path = server.heap_profiling.dir)]
169 pub heap_profiling_dir: Option<String>,
170
171 #[clap(long, hide = true, env = "RW_DANGEROUS_MAX_IDLE_SECS")]
173 #[override_opts(path = meta.dangerous_max_idle_secs)]
174 pub dangerous_max_idle_secs: Option<u64>,
175
176 #[deprecated = "connector node has been deprecated."]
178 #[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")]
179 pub connector_rpc_endpoint: Option<String>,
180
181 #[clap(long, hide = true, env = "RW_LICENSE_KEY")]
183 #[override_opts(path = system.license_key)]
184 pub license_key: Option<LicenseKey>,
185
186 #[clap(long, env = "RW_LICENSE_KEY_PATH")]
188 pub license_key_path: Option<PathBuf>,
189
190 #[educe(Debug(ignore))] #[clap(long, hide = true, env = "RW_SECRET_STORE_PRIVATE_KEY_HEX")]
193 pub secret_store_private_key_hex: Option<String>,
194
195 #[clap(
197 long,
198 hide = true,
199 env = "RW_TEMP_SECRET_FILE_DIR",
200 default_value = "./secrets"
201 )]
202 pub temp_secret_file_dir: String,
203}
204
205impl risingwave_common::opts::Opts for MetaNodeOpts {
206 fn name() -> &'static str {
207 "meta"
208 }
209
210 fn meta_addr(&self) -> MetaAddressStrategy {
211 format!("http://{}", self.listen_addr)
212 .parse()
213 .expect("invalid listen address")
214 }
215}
216
217use std::future::Future;
218use std::pin::Pin;
219use std::sync::Arc;
220
221use risingwave_common::config::{MetaBackend, RwConfig, load_config};
222use tracing::info;
223
224pub fn start(
226 opts: MetaNodeOpts,
227 shutdown: CancellationToken,
228) -> Pin<Box<dyn Future<Output = ()> + Send>> {
229 Box::pin(async move {
232 info!("Starting meta node");
233 info!("> options: {:?}", opts);
234 let config = load_config(&opts.config_path, &opts);
235 info!("> config: {:?}", config);
236 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
237 let listen_addr = opts.listen_addr.parse().unwrap();
238 let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap());
239 let prometheus_addr = opts.prometheus_listener_addr.map(|x| x.parse().unwrap());
240 let meta_store_config = config.meta.meta_store_config.clone();
241 let backend = match config.meta.backend {
242 MetaBackend::Mem => {
243 if opts.sql_endpoint.is_some() {
244 tracing::warn!("`--sql-endpoint` is ignored when using `mem` backend");
245 }
246 MetaStoreBackend::Mem
247 }
248 MetaBackend::Sql => MetaStoreBackend::Sql {
249 endpoint: opts
250 .sql_endpoint
251 .expect("sql endpoint is required")
252 .expose_secret()
253 .clone(),
254 config: meta_store_config,
255 },
256 MetaBackend::Sqlite => MetaStoreBackend::Sql {
257 endpoint: format!(
258 "sqlite://{}?mode=rwc",
259 opts.sql_endpoint
260 .expect("sql endpoint is required")
261 .expose_secret()
262 ),
263 config: meta_store_config,
264 },
265 MetaBackend::Postgres => MetaStoreBackend::Sql {
266 endpoint: format!(
267 "postgres://{}:{}@{}/{}{}",
268 opts.sql_username,
269 opts.sql_password.expose_secret(),
270 opts.sql_endpoint
271 .expect("sql endpoint is required")
272 .expose_secret(),
273 opts.sql_database,
274 if let Some(params) = &opts.sql_url_params
275 && !params.is_empty()
276 {
277 format!("?{}", params)
278 } else {
279 "".to_owned()
280 }
281 ),
282 config: meta_store_config,
283 },
284 MetaBackend::Mysql => MetaStoreBackend::Sql {
285 endpoint: format!(
286 "mysql://{}:{}@{}/{}{}",
287 opts.sql_username,
288 opts.sql_password.expose_secret(),
289 opts.sql_endpoint
290 .expect("sql endpoint is required")
291 .expose_secret(),
292 opts.sql_database,
293 if let Some(params) = &opts.sql_url_params
294 && !params.is_empty()
295 {
296 format!("?{}", params)
297 } else {
298 "".to_owned()
299 }
300 ),
301 config: meta_store_config,
302 },
303 };
304 validate_config(&config);
305
306 let total_memory_bytes = resource_util::memory::system_memory_available_bytes();
307 let heap_profiler =
308 HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
309 heap_profiler.start();
311
312 let secret_store_private_key = opts
313 .secret_store_private_key_hex
314 .map(|key| hex::decode(key).unwrap());
315 let max_heartbeat_interval =
316 Duration::from_secs(config.meta.max_heartbeat_interval_secs as u64);
317 let max_idle_ms = config.meta.dangerous_max_idle_secs.unwrap_or(0) * 1000;
318 let in_flight_barrier_nums = config.streaming.in_flight_barrier_nums;
319 let privatelink_endpoint_default_tags =
320 opts.privatelink_endpoint_default_tags.map(|tags| {
321 tags.split(',')
322 .map(|s| {
323 let key_val = s.split_once('=').unwrap();
324 (key_val.0.to_owned(), key_val.1.to_owned())
325 })
326 .collect()
327 });
328
329 let add_info = AddressInfo {
330 advertise_addr: opts.advertise_addr.to_owned(),
331 listen_addr,
332 prometheus_addr,
333 dashboard_addr,
334 };
335
336 const MIN_TIMEOUT_INTERVAL_SEC: u64 = 20;
337 let compaction_task_max_progress_interval_secs = {
338 let retry_config = &config.storage.object_store.retry;
339 let max_streming_read_timeout_ms = (retry_config.streaming_read_attempt_timeout_ms
340 + retry_config.req_backoff_max_delay_ms)
341 * retry_config.streaming_read_retry_attempts as u64;
342 let max_streaming_upload_timeout_ms = (retry_config
343 .streaming_upload_attempt_timeout_ms
344 + retry_config.req_backoff_max_delay_ms)
345 * retry_config.streaming_upload_retry_attempts as u64;
346 let max_upload_timeout_ms = (retry_config.upload_attempt_timeout_ms
347 + retry_config.req_backoff_max_delay_ms)
348 * retry_config.upload_retry_attempts as u64;
349 let max_read_timeout_ms = (retry_config.read_attempt_timeout_ms
350 + retry_config.req_backoff_max_delay_ms)
351 * retry_config.read_retry_attempts as u64;
352 let max_timeout_ms = max_streming_read_timeout_ms
353 .max(max_upload_timeout_ms)
354 .max(max_streaming_upload_timeout_ms)
355 .max(max_read_timeout_ms)
356 .max(config.meta.compaction_task_max_progress_interval_secs * 1000);
357 max_timeout_ms / 1000
358 } + MIN_TIMEOUT_INTERVAL_SEC;
359
360 rpc_serve(
361 add_info,
362 backend,
363 max_heartbeat_interval,
364 config.meta.meta_leader_lease_secs,
365 MetaOpts {
366 enable_recovery: !config.meta.disable_recovery,
367 disable_automatic_parallelism_control: config
368 .meta
369 .disable_automatic_parallelism_control,
370 parallelism_control_batch_size: config.meta.parallelism_control_batch_size,
371 parallelism_control_trigger_period_sec: config
372 .meta
373 .parallelism_control_trigger_period_sec,
374 parallelism_control_trigger_first_delay_sec: config
375 .meta
376 .parallelism_control_trigger_first_delay_sec,
377 in_flight_barrier_nums,
378 max_idle_ms,
379 compaction_deterministic_test: config.meta.enable_compaction_deterministic,
380 default_parallelism: config.meta.default_parallelism,
381 vacuum_interval_sec: config.meta.vacuum_interval_sec,
382 time_travel_vacuum_interval_sec: config
383 .meta
384 .developer
385 .time_travel_vacuum_interval_sec,
386 vacuum_spin_interval_ms: config.meta.vacuum_spin_interval_ms,
387 hummock_version_checkpoint_interval_sec: config
388 .meta
389 .hummock_version_checkpoint_interval_sec,
390 enable_hummock_data_archive: config.meta.enable_hummock_data_archive,
391 hummock_time_travel_snapshot_interval: config
392 .meta
393 .hummock_time_travel_snapshot_interval,
394 hummock_time_travel_sst_info_fetch_batch_size: config
395 .meta
396 .developer
397 .hummock_time_travel_sst_info_fetch_batch_size,
398 hummock_time_travel_sst_info_insert_batch_size: config
399 .meta
400 .developer
401 .hummock_time_travel_sst_info_insert_batch_size,
402 hummock_time_travel_epoch_version_insert_batch_size: config
403 .meta
404 .developer
405 .hummock_time_travel_epoch_version_insert_batch_size,
406 hummock_gc_history_insert_batch_size: config
407 .meta
408 .developer
409 .hummock_gc_history_insert_batch_size,
410 hummock_time_travel_filter_out_objects_batch_size: config
411 .meta
412 .developer
413 .hummock_time_travel_filter_out_objects_batch_size,
414 hummock_time_travel_filter_out_objects_v1: config
415 .meta
416 .developer
417 .hummock_time_travel_filter_out_objects_v1,
418 hummock_time_travel_filter_out_objects_list_version_batch_size: config
419 .meta
420 .developer
421 .hummock_time_travel_filter_out_objects_list_version_batch_size,
422 hummock_time_travel_filter_out_objects_list_delta_batch_size: config
423 .meta
424 .developer
425 .hummock_time_travel_filter_out_objects_list_delta_batch_size,
426 min_delta_log_num_for_hummock_version_checkpoint: config
427 .meta
428 .min_delta_log_num_for_hummock_version_checkpoint,
429 min_sst_retention_time_sec: config.meta.min_sst_retention_time_sec,
430 full_gc_interval_sec: config.meta.full_gc_interval_sec,
431 full_gc_object_limit: config.meta.full_gc_object_limit,
432 gc_history_retention_time_sec: config.meta.gc_history_retention_time_sec,
433 max_inflight_time_travel_query: config.meta.max_inflight_time_travel_query,
434 enable_committed_sst_sanity_check: config.meta.enable_committed_sst_sanity_check,
435 periodic_compaction_interval_sec: config.meta.periodic_compaction_interval_sec,
436 node_num_monitor_interval_sec: config.meta.node_num_monitor_interval_sec,
437 protect_drop_table_with_incoming_sink: config
438 .meta
439 .protect_drop_table_with_incoming_sink,
440 prometheus_endpoint: opts.prometheus_endpoint,
441 prometheus_selector: opts.prometheus_selector,
442 vpc_id: opts.vpc_id,
443 security_group_id: opts.security_group_id,
444 privatelink_endpoint_default_tags,
445 periodic_space_reclaim_compaction_interval_sec: config
446 .meta
447 .periodic_space_reclaim_compaction_interval_sec,
448 telemetry_enabled: config.server.telemetry_enabled,
449 periodic_ttl_reclaim_compaction_interval_sec: config
450 .meta
451 .periodic_ttl_reclaim_compaction_interval_sec,
452 periodic_tombstone_reclaim_compaction_interval_sec: config
453 .meta
454 .periodic_tombstone_reclaim_compaction_interval_sec,
455 periodic_scheduling_compaction_group_split_interval_sec: config
456 .meta
457 .periodic_scheduling_compaction_group_split_interval_sec,
458 periodic_scheduling_compaction_group_merge_interval_sec: config
459 .meta
460 .periodic_scheduling_compaction_group_merge_interval_sec,
461 compaction_group_merge_dimension_threshold: config
462 .meta
463 .compaction_group_merge_dimension_threshold,
464 table_high_write_throughput_threshold: config
465 .meta
466 .table_high_write_throughput_threshold,
467 table_low_write_throughput_threshold: config
468 .meta
469 .table_low_write_throughput_threshold,
470 partition_vnode_count: config.meta.partition_vnode_count,
471 compact_task_table_size_partition_threshold_low: config
472 .meta
473 .compact_task_table_size_partition_threshold_low,
474 compact_task_table_size_partition_threshold_high: config
475 .meta
476 .compact_task_table_size_partition_threshold_high,
477 do_not_config_object_storage_lifecycle: config
478 .meta
479 .do_not_config_object_storage_lifecycle,
480 compaction_task_max_heartbeat_interval_secs: config
481 .meta
482 .compaction_task_max_heartbeat_interval_secs,
483 compaction_task_max_progress_interval_secs,
484 compaction_config: Some(config.meta.compaction_config),
485 hybrid_partition_node_count: config.meta.hybrid_partition_vnode_count,
486 event_log_enabled: config.meta.event_log_enabled,
487 event_log_channel_max_size: config.meta.event_log_channel_max_size,
488 advertise_addr: opts.advertise_addr,
489 cached_traces_num: config.meta.developer.cached_traces_num,
490 cached_traces_memory_limit_bytes: config
491 .meta
492 .developer
493 .cached_traces_memory_limit_bytes,
494 enable_trivial_move: config.meta.developer.enable_trivial_move,
495 enable_check_task_level_overlap: config
496 .meta
497 .developer
498 .enable_check_task_level_overlap,
499 enable_dropped_column_reclaim: config.meta.enable_dropped_column_reclaim,
500 split_group_size_ratio: config.meta.split_group_size_ratio,
501 table_stat_high_write_throughput_ratio_for_split: config
502 .meta
503 .table_stat_high_write_throughput_ratio_for_split,
504 table_stat_low_write_throughput_ratio_for_merge: config
505 .meta
506 .table_stat_low_write_throughput_ratio_for_merge,
507 table_stat_throuput_window_seconds_for_split: config
508 .meta
509 .table_stat_throuput_window_seconds_for_split,
510 table_stat_throuput_window_seconds_for_merge: config
511 .meta
512 .table_stat_throuput_window_seconds_for_merge,
513 object_store_config: config.storage.object_store,
514 max_trivial_move_task_count_per_loop: config
515 .meta
516 .developer
517 .max_trivial_move_task_count_per_loop,
518 max_get_task_probe_times: config.meta.developer.max_get_task_probe_times,
519 secret_store_private_key,
520 temp_secret_file_dir: opts.temp_secret_file_dir,
521 actor_cnt_per_worker_parallelism_hard_limit: config
522 .meta
523 .developer
524 .actor_cnt_per_worker_parallelism_hard_limit,
525 actor_cnt_per_worker_parallelism_soft_limit: config
526 .meta
527 .developer
528 .actor_cnt_per_worker_parallelism_soft_limit,
529 license_key_path: opts.license_key_path,
530 compute_client_config: config.meta.developer.compute_client_config.clone(),
531 stream_client_config: config.meta.developer.stream_client_config.clone(),
532 frontend_client_config: config.meta.developer.frontend_client_config.clone(),
533 redact_sql_option_keywords: Arc::new(
534 config
535 .batch
536 .redact_sql_option_keywords
537 .into_iter()
538 .collect(),
539 ),
540 cdc_table_split_init_sleep_interval_splits: config
541 .meta
542 .cdc_table_split_init_sleep_interval_splits,
543 cdc_table_split_init_sleep_duration_millis: config
544 .meta
545 .cdc_table_split_init_sleep_duration_millis,
546 cdc_table_split_init_insert_batch_size: config
547 .meta
548 .cdc_table_split_init_insert_batch_size,
549 },
550 config.system.into_init_system_params(),
551 Default::default(),
552 shutdown,
553 )
554 .await
555 .unwrap();
556 })
557}
558
559fn validate_config(config: &RwConfig) {
560 if config.meta.meta_leader_lease_secs <= 2 {
561 let error_msg = "meta leader lease secs should be larger than 2";
562 tracing::error!(error_msg);
563 panic!("{}", error_msg);
564 }
565
566 if config.meta.parallelism_control_batch_size == 0 {
567 let error_msg = "parallelism control batch size should be larger than 0";
568 tracing::error!(error_msg);
569 panic!("{}", error_msg);
570 }
571}