1#![feature(coverage_attribute)]
16
17mod server;
18
19use std::path::PathBuf;
20use std::time::Duration;
21
22use clap::Parser;
23use educe::Educe;
24pub use error::{MetaError, MetaResult};
25use redact::Secret;
26use risingwave_common::config::OverrideConfig;
27use risingwave_common::license::LicenseKey;
28use risingwave_common::util::meta_addr::MetaAddressStrategy;
29use risingwave_common::util::resource_util;
30use risingwave_common::util::tokio_util::sync::CancellationToken;
31use risingwave_common::{GIT_SHA, RW_VERSION};
32use risingwave_common_heap_profiling::HeapProfiler;
33use risingwave_meta::*;
34use risingwave_meta_service::*;
35pub use rpc::{ElectionClient, ElectionMember};
36use server::rpc_serve;
37pub use server::started::get as is_server_started;
38
39use crate::manager::MetaOpts;
40
41#[derive(Educe, Clone, Parser, OverrideConfig)]
42#[educe(Debug)]
43#[command(version, about = "The central metadata management service")]
44pub struct MetaNodeOpts {
45 #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5690")]
47 pub listen_addr: String,
48
49 #[clap(long, env = "RW_ADVERTISE_ADDR", default_value = "127.0.0.1:5690")]
55 pub advertise_addr: String,
56
57 #[clap(long, env = "RW_DASHBOARD_HOST")]
58 pub dashboard_host: Option<String>,
59
60 #[clap(long, env = "RW_PROMETHEUS_HOST", alias = "prometheus-host")]
63 pub prometheus_listener_addr: Option<String>,
64
65 #[clap(long, hide = true, env = "RW_SQL_ENDPOINT")]
67 pub sql_endpoint: Option<Secret<String>>,
68
69 #[clap(long, hide = true, env = "RW_SQL_USERNAME", default_value = "")]
71 pub sql_username: String,
72
73 #[clap(long, hide = true, env = "RW_SQL_PASSWORD", default_value = "")]
75 pub sql_password: Secret<String>,
76
77 #[clap(long, hide = true, env = "RW_SQL_DATABASE", default_value = "")]
79 pub sql_database: String,
80
81 #[clap(long, hide = true, env = "RW_SQL_URL_PARAMS")]
84 pub sql_url_params: Option<String>,
85
86 #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")]
90 pub prometheus_endpoint: Option<String>,
91
92 #[clap(long, env = "RW_PROMETHEUS_SELECTOR")]
96 pub prometheus_selector: Option<String>,
97
98 #[clap(long, hide = true, env = "RW_PRIVATELINK_ENDPOINT_DEFAULT_TAGS")]
102 pub privatelink_endpoint_default_tags: Option<String>,
103
104 #[clap(long, hide = true, env = "RW_VPC_ID")]
105 pub vpc_id: Option<String>,
106
107 #[clap(long, hide = true, env = "RW_VPC_SECURITY_GROUP_ID")]
108 pub security_group_id: Option<String>,
109
110 #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
114 pub config_path: String,
115
116 #[clap(long, hide = true, env = "RW_BACKEND", value_enum)]
117 #[override_opts(path = meta.backend)]
118 pub backend: Option<MetaBackend>,
119
120 #[clap(long, hide = true, env = "RW_BARRIER_INTERVAL_MS")]
122 #[override_opts(path = system.barrier_interval_ms)]
123 pub barrier_interval_ms: Option<u32>,
124
125 #[clap(long, hide = true, env = "RW_SSTABLE_SIZE_MB")]
127 #[override_opts(path = system.sstable_size_mb)]
128 pub sstable_size_mb: Option<u32>,
129
130 #[clap(long, hide = true, env = "RW_BLOCK_SIZE_KB")]
132 #[override_opts(path = system.block_size_kb)]
133 pub block_size_kb: Option<u32>,
134
135 #[clap(long, hide = true, env = "RW_BLOOM_FALSE_POSITIVE")]
137 #[override_opts(path = system.bloom_false_positive)]
138 pub bloom_false_positive: Option<f64>,
139
140 #[clap(long, hide = true, env = "RW_STATE_STORE")]
142 #[override_opts(path = system.state_store)]
143 pub state_store: Option<String>,
144
145 #[clap(long, hide = true, env = "RW_DATA_DIRECTORY")]
147 #[override_opts(path = system.data_directory)]
148 pub data_directory: Option<String>,
149
150 #[clap(long, hide = true, env = "RW_DO_NOT_CONFIG_BUCKET_LIFECYCLE")]
152 #[override_opts(path = meta.do_not_config_object_storage_lifecycle)]
153 pub do_not_config_object_storage_lifecycle: Option<bool>,
154
155 #[clap(long, hide = true, env = "RW_BACKUP_STORAGE_URL")]
157 #[override_opts(path = system.backup_storage_url)]
158 pub backup_storage_url: Option<String>,
159
160 #[clap(long, hide = true, env = "RW_BACKUP_STORAGE_DIRECTORY")]
162 #[override_opts(path = system.backup_storage_directory)]
163 pub backup_storage_directory: Option<String>,
164
165 #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
167 #[override_opts(path = server.heap_profiling.dir)]
168 pub heap_profiling_dir: Option<String>,
169
170 #[clap(long, hide = true, env = "RW_DANGEROUS_MAX_IDLE_SECS")]
172 #[override_opts(path = meta.dangerous_max_idle_secs)]
173 pub dangerous_max_idle_secs: Option<u64>,
174
175 #[deprecated = "connector node has been deprecated."]
177 #[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")]
178 pub connector_rpc_endpoint: Option<String>,
179
180 #[clap(long, hide = true, env = "RW_LICENSE_KEY")]
182 #[override_opts(path = system.license_key)]
183 pub license_key: Option<LicenseKey>,
184
185 #[clap(long, env = "RW_LICENSE_KEY_PATH")]
187 pub license_key_path: Option<PathBuf>,
188
189 #[educe(Debug(ignore))] #[clap(long, hide = true, env = "RW_SECRET_STORE_PRIVATE_KEY_HEX")]
192 pub secret_store_private_key_hex: Option<String>,
193
194 #[clap(
196 long,
197 hide = true,
198 env = "RW_TEMP_SECRET_FILE_DIR",
199 default_value = "./secrets"
200 )]
201 pub temp_secret_file_dir: String,
202}
203
204impl risingwave_common::opts::Opts for MetaNodeOpts {
205 fn name() -> &'static str {
206 "meta"
207 }
208
209 fn meta_addr(&self) -> MetaAddressStrategy {
210 format!("http://{}", self.listen_addr)
211 .parse()
212 .expect("invalid listen address")
213 }
214}
215
216use std::future::Future;
217use std::pin::Pin;
218use std::sync::Arc;
219
220use risingwave_common::config::{MetaBackend, RwConfig, load_config};
221use tracing::info;
222
223pub fn start(
225 opts: MetaNodeOpts,
226 shutdown: CancellationToken,
227) -> Pin<Box<dyn Future<Output = ()> + Send>> {
228 Box::pin(async move {
231 info!("Starting meta node");
232 info!("> options: {:?}", opts);
233 let config = load_config(&opts.config_path, &opts);
234 info!("> config: {:?}", config);
235 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
236 let listen_addr = opts.listen_addr.parse().unwrap();
237 let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap());
238 let prometheus_addr = opts.prometheus_listener_addr.map(|x| x.parse().unwrap());
239 let meta_store_config = config.meta.meta_store_config.clone();
240 let backend = match config.meta.backend {
241 MetaBackend::Mem => {
242 if opts.sql_endpoint.is_some() {
243 tracing::warn!("`--sql-endpoint` is ignored when using `mem` backend");
244 }
245 MetaStoreBackend::Mem
246 }
247 MetaBackend::Sql => MetaStoreBackend::Sql {
248 endpoint: opts
249 .sql_endpoint
250 .expect("sql endpoint is required")
251 .expose_secret()
252 .clone(),
253 config: meta_store_config,
254 },
255 MetaBackend::Sqlite => MetaStoreBackend::Sql {
256 endpoint: format!(
257 "sqlite://{}?mode=rwc",
258 opts.sql_endpoint
259 .expect("sql endpoint is required")
260 .expose_secret()
261 ),
262 config: meta_store_config,
263 },
264 MetaBackend::Postgres => MetaStoreBackend::Sql {
265 endpoint: format!(
266 "postgres://{}:{}@{}/{}{}",
267 opts.sql_username,
268 opts.sql_password.expose_secret(),
269 opts.sql_endpoint
270 .expect("sql endpoint is required")
271 .expose_secret(),
272 opts.sql_database,
273 if let Some(params) = &opts.sql_url_params
274 && !params.is_empty()
275 {
276 format!("?{}", params)
277 } else {
278 "".to_owned()
279 }
280 ),
281 config: meta_store_config,
282 },
283 MetaBackend::Mysql => MetaStoreBackend::Sql {
284 endpoint: format!(
285 "mysql://{}:{}@{}/{}{}",
286 opts.sql_username,
287 opts.sql_password.expose_secret(),
288 opts.sql_endpoint
289 .expect("sql endpoint is required")
290 .expose_secret(),
291 opts.sql_database,
292 if let Some(params) = &opts.sql_url_params
293 && !params.is_empty()
294 {
295 format!("?{}", params)
296 } else {
297 "".to_owned()
298 }
299 ),
300 config: meta_store_config,
301 },
302 };
303 validate_config(&config);
304
305 let total_memory_bytes = resource_util::memory::system_memory_available_bytes();
306 let heap_profiler =
307 HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
308 heap_profiler.start();
310
311 let secret_store_private_key = opts
312 .secret_store_private_key_hex
313 .map(|key| hex::decode(key).unwrap());
314 let max_heartbeat_interval =
315 Duration::from_secs(config.meta.max_heartbeat_interval_secs as u64);
316 let max_idle_ms = config.meta.dangerous_max_idle_secs.unwrap_or(0) * 1000;
317 let in_flight_barrier_nums = config.streaming.in_flight_barrier_nums;
318 let privatelink_endpoint_default_tags =
319 opts.privatelink_endpoint_default_tags.map(|tags| {
320 tags.split(',')
321 .map(|s| {
322 let key_val = s.split_once('=').unwrap();
323 (key_val.0.to_owned(), key_val.1.to_owned())
324 })
325 .collect()
326 });
327
328 let add_info = AddressInfo {
329 advertise_addr: opts.advertise_addr.clone(),
330 listen_addr,
331 prometheus_addr,
332 dashboard_addr,
333 };
334
335 const MIN_TIMEOUT_INTERVAL_SEC: u64 = 20;
336 let compaction_task_max_progress_interval_secs = {
337 let retry_config = &config.storage.object_store.retry;
338 let max_streaming_read_timeout_ms = (retry_config.streaming_read_attempt_timeout_ms
339 + retry_config.req_backoff_max_delay_ms)
340 * retry_config.streaming_read_retry_attempts as u64;
341 let max_streaming_upload_timeout_ms = (retry_config
342 .streaming_upload_attempt_timeout_ms
343 + retry_config.req_backoff_max_delay_ms)
344 * retry_config.streaming_upload_retry_attempts as u64;
345 let max_upload_timeout_ms = (retry_config.upload_attempt_timeout_ms
346 + retry_config.req_backoff_max_delay_ms)
347 * retry_config.upload_retry_attempts as u64;
348 let max_read_timeout_ms = (retry_config.read_attempt_timeout_ms
349 + retry_config.req_backoff_max_delay_ms)
350 * retry_config.read_retry_attempts as u64;
351 let max_timeout_ms = max_streaming_read_timeout_ms
352 .max(max_upload_timeout_ms)
353 .max(max_streaming_upload_timeout_ms)
354 .max(max_read_timeout_ms)
355 .max(config.meta.compaction_task_max_progress_interval_secs * 1000);
356 max_timeout_ms / 1000
357 } + MIN_TIMEOUT_INTERVAL_SEC;
358
359 rpc_serve(
360 add_info,
361 backend,
362 max_heartbeat_interval,
363 config.meta.meta_leader_lease_secs,
364 MetaOpts {
365 enable_recovery: !config.meta.disable_recovery,
366 disable_automatic_parallelism_control: config
367 .meta
368 .disable_automatic_parallelism_control,
369 parallelism_control_batch_size: config.meta.parallelism_control_batch_size,
370 parallelism_control_trigger_period_sec: config
371 .meta
372 .parallelism_control_trigger_period_sec,
373 parallelism_control_trigger_first_delay_sec: config
374 .meta
375 .parallelism_control_trigger_first_delay_sec,
376 in_flight_barrier_nums,
377 max_idle_ms,
378 compaction_deterministic_test: config.meta.enable_compaction_deterministic,
379 default_parallelism: config.meta.default_parallelism,
380 vacuum_interval_sec: config.meta.vacuum_interval_sec,
381 time_travel_vacuum_interval_sec: config
382 .meta
383 .developer
384 .time_travel_vacuum_interval_sec,
385 vacuum_spin_interval_ms: config.meta.vacuum_spin_interval_ms,
386 iceberg_gc_interval_sec: config.meta.iceberg_gc_interval_sec,
387 hummock_version_checkpoint_interval_sec: config
388 .meta
389 .hummock_version_checkpoint_interval_sec,
390 enable_hummock_data_archive: config.meta.enable_hummock_data_archive,
391 hummock_time_travel_snapshot_interval: config
392 .meta
393 .hummock_time_travel_snapshot_interval,
394 hummock_time_travel_sst_info_fetch_batch_size: config
395 .meta
396 .developer
397 .hummock_time_travel_sst_info_fetch_batch_size,
398 hummock_time_travel_sst_info_insert_batch_size: config
399 .meta
400 .developer
401 .hummock_time_travel_sst_info_insert_batch_size,
402 hummock_time_travel_epoch_version_insert_batch_size: config
403 .meta
404 .developer
405 .hummock_time_travel_epoch_version_insert_batch_size,
406 hummock_gc_history_insert_batch_size: config
407 .meta
408 .developer
409 .hummock_gc_history_insert_batch_size,
410 hummock_time_travel_filter_out_objects_batch_size: config
411 .meta
412 .developer
413 .hummock_time_travel_filter_out_objects_batch_size,
414 hummock_time_travel_filter_out_objects_v1: config
415 .meta
416 .developer
417 .hummock_time_travel_filter_out_objects_v1,
418 hummock_time_travel_filter_out_objects_list_version_batch_size: config
419 .meta
420 .developer
421 .hummock_time_travel_filter_out_objects_list_version_batch_size,
422 hummock_time_travel_filter_out_objects_list_delta_batch_size: config
423 .meta
424 .developer
425 .hummock_time_travel_filter_out_objects_list_delta_batch_size,
426 min_delta_log_num_for_hummock_version_checkpoint: config
427 .meta
428 .min_delta_log_num_for_hummock_version_checkpoint,
429 min_sst_retention_time_sec: config.meta.min_sst_retention_time_sec,
430 full_gc_interval_sec: config.meta.full_gc_interval_sec,
431 full_gc_object_limit: config.meta.full_gc_object_limit,
432 gc_history_retention_time_sec: config.meta.gc_history_retention_time_sec,
433 max_inflight_time_travel_query: config.meta.max_inflight_time_travel_query,
434 enable_committed_sst_sanity_check: config.meta.enable_committed_sst_sanity_check,
435 periodic_compaction_interval_sec: config.meta.periodic_compaction_interval_sec,
436 node_num_monitor_interval_sec: config.meta.node_num_monitor_interval_sec,
437 protect_drop_table_with_incoming_sink: config
438 .meta
439 .protect_drop_table_with_incoming_sink,
440 prometheus_endpoint: opts.prometheus_endpoint,
441 prometheus_selector: opts.prometheus_selector,
442 vpc_id: opts.vpc_id,
443 security_group_id: opts.security_group_id,
444 privatelink_endpoint_default_tags,
445 periodic_space_reclaim_compaction_interval_sec: config
446 .meta
447 .periodic_space_reclaim_compaction_interval_sec,
448 telemetry_enabled: config.server.telemetry_enabled,
449 periodic_ttl_reclaim_compaction_interval_sec: config
450 .meta
451 .periodic_ttl_reclaim_compaction_interval_sec,
452 periodic_tombstone_reclaim_compaction_interval_sec: config
453 .meta
454 .periodic_tombstone_reclaim_compaction_interval_sec,
455 periodic_scheduling_compaction_group_split_interval_sec: config
456 .meta
457 .periodic_scheduling_compaction_group_split_interval_sec,
458 periodic_scheduling_compaction_group_merge_interval_sec: config
459 .meta
460 .periodic_scheduling_compaction_group_merge_interval_sec,
461 compaction_group_merge_dimension_threshold: config
462 .meta
463 .compaction_group_merge_dimension_threshold,
464 table_high_write_throughput_threshold: config
465 .meta
466 .table_high_write_throughput_threshold,
467 table_low_write_throughput_threshold: config
468 .meta
469 .table_low_write_throughput_threshold,
470 partition_vnode_count: config.meta.partition_vnode_count,
471 compact_task_table_size_partition_threshold_low: config
472 .meta
473 .compact_task_table_size_partition_threshold_low,
474 compact_task_table_size_partition_threshold_high: config
475 .meta
476 .compact_task_table_size_partition_threshold_high,
477 do_not_config_object_storage_lifecycle: config
478 .meta
479 .do_not_config_object_storage_lifecycle,
480 compaction_task_max_heartbeat_interval_secs: config
481 .meta
482 .compaction_task_max_heartbeat_interval_secs,
483 compaction_task_max_progress_interval_secs,
484 compaction_config: Some(config.meta.compaction_config),
485 hybrid_partition_node_count: config.meta.hybrid_partition_vnode_count,
486 event_log_enabled: config.meta.event_log_enabled,
487 event_log_channel_max_size: config.meta.event_log_channel_max_size,
488 advertise_addr: opts.advertise_addr,
489 cached_traces_num: config.meta.developer.cached_traces_num,
490 cached_traces_memory_limit_bytes: config
491 .meta
492 .developer
493 .cached_traces_memory_limit_bytes,
494 enable_trivial_move: config.meta.developer.enable_trivial_move,
495 enable_check_task_level_overlap: config
496 .meta
497 .developer
498 .enable_check_task_level_overlap,
499 enable_dropped_column_reclaim: config.meta.enable_dropped_column_reclaim,
500 split_group_size_ratio: config.meta.split_group_size_ratio,
501 table_stat_high_write_throughput_ratio_for_split: config
502 .meta
503 .table_stat_high_write_throughput_ratio_for_split,
504 table_stat_low_write_throughput_ratio_for_merge: config
505 .meta
506 .table_stat_low_write_throughput_ratio_for_merge,
507 table_stat_throuput_window_seconds_for_split: config
508 .meta
509 .table_stat_throuput_window_seconds_for_split,
510 table_stat_throuput_window_seconds_for_merge: config
511 .meta
512 .table_stat_throuput_window_seconds_for_merge,
513 object_store_config: config.storage.object_store,
514 max_trivial_move_task_count_per_loop: config
515 .meta
516 .developer
517 .max_trivial_move_task_count_per_loop,
518 max_get_task_probe_times: config.meta.developer.max_get_task_probe_times,
519 secret_store_private_key,
520 temp_secret_file_dir: opts.temp_secret_file_dir,
521 actor_cnt_per_worker_parallelism_hard_limit: config
522 .meta
523 .developer
524 .actor_cnt_per_worker_parallelism_hard_limit,
525 actor_cnt_per_worker_parallelism_soft_limit: config
526 .meta
527 .developer
528 .actor_cnt_per_worker_parallelism_soft_limit,
529 license_key_path: opts.license_key_path,
530 compute_client_config: config.meta.developer.compute_client_config.clone(),
531 stream_client_config: config.meta.developer.stream_client_config.clone(),
532 frontend_client_config: config.meta.developer.frontend_client_config.clone(),
533 redact_sql_option_keywords: Arc::new(
534 config
535 .batch
536 .redact_sql_option_keywords
537 .into_iter()
538 .collect(),
539 ),
540 cdc_table_split_init_sleep_interval_splits: config
541 .meta
542 .cdc_table_split_init_sleep_interval_splits,
543 cdc_table_split_init_sleep_duration_millis: config
544 .meta
545 .cdc_table_split_init_sleep_duration_millis,
546 cdc_table_split_init_insert_batch_size: config
547 .meta
548 .cdc_table_split_init_insert_batch_size,
549 },
550 config.system.into_init_system_params(),
551 Default::default(),
552 shutdown,
553 )
554 .await
555 .unwrap();
556 })
557}
558
559fn validate_config(config: &RwConfig) {
560 if config.meta.meta_leader_lease_secs <= 2 {
561 let error_msg = "meta leader lease secs should be larger than 2";
562 tracing::error!(error_msg);
563 panic!("{}", error_msg);
564 }
565
566 if config.meta.parallelism_control_batch_size == 0 {
567 let error_msg = "parallelism control batch size should be larger than 0";
568 tracing::error!(error_msg);
569 panic!("{}", error_msg);
570 }
571}