1#![feature(let_chains)]
16#![feature(coverage_attribute)]
17
18mod server;
19
20use std::path::PathBuf;
21use std::time::Duration;
22
23use clap::Parser;
24use educe::Educe;
25pub use error::{MetaError, MetaResult};
26use redact::Secret;
27use risingwave_common::config::OverrideConfig;
28use risingwave_common::license::LicenseKey;
29use risingwave_common::util::meta_addr::MetaAddressStrategy;
30use risingwave_common::util::resource_util;
31use risingwave_common::util::tokio_util::sync::CancellationToken;
32use risingwave_common::{GIT_SHA, RW_VERSION};
33use risingwave_common_heap_profiling::HeapProfiler;
34use risingwave_meta::*;
35use risingwave_meta_service::*;
36pub use rpc::{ElectionClient, ElectionMember};
37use server::rpc_serve;
38pub use server::started::get as is_server_started;
39
40use crate::manager::MetaOpts;
41
42#[derive(Educe, Clone, Parser, OverrideConfig)]
43#[educe(Debug)]
44#[command(version, about = "The central metadata management service")]
45pub struct MetaNodeOpts {
46 #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5690")]
48 pub listen_addr: String,
49
50 #[clap(long, env = "RW_ADVERTISE_ADDR", default_value = "127.0.0.1:5690")]
56 pub advertise_addr: String,
57
58 #[clap(long, env = "RW_DASHBOARD_HOST")]
59 pub dashboard_host: Option<String>,
60
61 #[clap(long, env = "RW_PROMETHEUS_HOST", alias = "prometheus-host")]
64 pub prometheus_listener_addr: Option<String>,
65
66 #[clap(long, hide = true, env = "RW_SQL_ENDPOINT")]
68 pub sql_endpoint: Option<Secret<String>>,
69
70 #[clap(long, hide = true, env = "RW_SQL_USERNAME", default_value = "")]
72 pub sql_username: String,
73
74 #[clap(long, hide = true, env = "RW_SQL_PASSWORD", default_value = "")]
76 pub sql_password: Secret<String>,
77
78 #[clap(long, hide = true, env = "RW_SQL_DATABASE", default_value = "")]
80 pub sql_database: String,
81
82 #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")]
86 pub prometheus_endpoint: Option<String>,
87
88 #[clap(long, env = "RW_PROMETHEUS_SELECTOR")]
92 pub prometheus_selector: Option<String>,
93
94 #[clap(long, hide = true, env = "RW_PRIVATELINK_ENDPOINT_DEFAULT_TAGS")]
98 pub privatelink_endpoint_default_tags: Option<String>,
99
100 #[clap(long, hide = true, env = "RW_VPC_ID")]
101 pub vpc_id: Option<String>,
102
103 #[clap(long, hide = true, env = "RW_VPC_SECURITY_GROUP_ID")]
104 pub security_group_id: Option<String>,
105
106 #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
110 pub config_path: String,
111
112 #[clap(long, hide = true, env = "RW_BACKEND", value_enum)]
113 #[override_opts(path = meta.backend)]
114 pub backend: Option<MetaBackend>,
115
116 #[clap(long, hide = true, env = "RW_BARRIER_INTERVAL_MS")]
118 #[override_opts(path = system.barrier_interval_ms)]
119 pub barrier_interval_ms: Option<u32>,
120
121 #[clap(long, hide = true, env = "RW_SSTABLE_SIZE_MB")]
123 #[override_opts(path = system.sstable_size_mb)]
124 pub sstable_size_mb: Option<u32>,
125
126 #[clap(long, hide = true, env = "RW_BLOCK_SIZE_KB")]
128 #[override_opts(path = system.block_size_kb)]
129 pub block_size_kb: Option<u32>,
130
131 #[clap(long, hide = true, env = "RW_BLOOM_FALSE_POSITIVE")]
133 #[override_opts(path = system.bloom_false_positive)]
134 pub bloom_false_positive: Option<f64>,
135
136 #[clap(long, hide = true, env = "RW_STATE_STORE")]
138 #[override_opts(path = system.state_store)]
139 pub state_store: Option<String>,
140
141 #[clap(long, hide = true, env = "RW_DATA_DIRECTORY")]
143 #[override_opts(path = system.data_directory)]
144 pub data_directory: Option<String>,
145
146 #[clap(long, hide = true, env = "RW_DO_NOT_CONFIG_BUCKET_LIFECYCLE")]
148 #[override_opts(path = meta.do_not_config_object_storage_lifecycle)]
149 pub do_not_config_object_storage_lifecycle: Option<bool>,
150
151 #[clap(long, hide = true, env = "RW_BACKUP_STORAGE_URL")]
153 #[override_opts(path = system.backup_storage_url)]
154 pub backup_storage_url: Option<String>,
155
156 #[clap(long, hide = true, env = "RW_BACKUP_STORAGE_DIRECTORY")]
158 #[override_opts(path = system.backup_storage_directory)]
159 pub backup_storage_directory: Option<String>,
160
161 #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
163 #[override_opts(path = server.heap_profiling.dir)]
164 pub heap_profiling_dir: Option<String>,
165
166 #[clap(long, hide = true, env = "RW_DANGEROUS_MAX_IDLE_SECS")]
168 #[override_opts(path = meta.dangerous_max_idle_secs)]
169 pub dangerous_max_idle_secs: Option<u64>,
170
171 #[deprecated = "connector node has been deprecated."]
173 #[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")]
174 pub connector_rpc_endpoint: Option<String>,
175
176 #[clap(long, hide = true, env = "RW_LICENSE_KEY")]
178 #[override_opts(path = system.license_key)]
179 pub license_key: Option<LicenseKey>,
180
181 #[clap(long, env = "RW_LICENSE_KEY_PATH")]
183 pub license_key_path: Option<PathBuf>,
184
185 #[educe(Debug(ignore))] #[clap(long, hide = true, env = "RW_SECRET_STORE_PRIVATE_KEY_HEX")]
188 pub secret_store_private_key_hex: Option<String>,
189
190 #[clap(
192 long,
193 hide = true,
194 env = "RW_TEMP_SECRET_FILE_DIR",
195 default_value = "./secrets"
196 )]
197 pub temp_secret_file_dir: String,
198}
199
200impl risingwave_common::opts::Opts for MetaNodeOpts {
201 fn name() -> &'static str {
202 "meta"
203 }
204
205 fn meta_addr(&self) -> MetaAddressStrategy {
206 format!("http://{}", self.listen_addr)
207 .parse()
208 .expect("invalid listen address")
209 }
210}
211
212use std::future::Future;
213use std::pin::Pin;
214use std::sync::Arc;
215
216use risingwave_common::config::{MetaBackend, RwConfig, load_config};
217use tracing::info;
218
219pub fn start(
221 opts: MetaNodeOpts,
222 shutdown: CancellationToken,
223) -> Pin<Box<dyn Future<Output = ()> + Send>> {
224 Box::pin(async move {
227 info!("Starting meta node");
228 info!("> options: {:?}", opts);
229 let config = load_config(&opts.config_path, &opts);
230 info!("> config: {:?}", config);
231 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
232 let listen_addr = opts.listen_addr.parse().unwrap();
233 let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap());
234 let prometheus_addr = opts.prometheus_listener_addr.map(|x| x.parse().unwrap());
235 let meta_store_config = config.meta.meta_store_config.clone();
236 let backend = match config.meta.backend {
237 MetaBackend::Mem => {
238 if opts.sql_endpoint.is_some() {
239 tracing::warn!("`--sql-endpoint` is ignored when using `mem` backend");
240 }
241 MetaStoreBackend::Mem
242 }
243 MetaBackend::Sql => MetaStoreBackend::Sql {
244 endpoint: opts
245 .sql_endpoint
246 .expect("sql endpoint is required")
247 .expose_secret()
248 .clone(),
249 config: meta_store_config,
250 },
251 MetaBackend::Sqlite => MetaStoreBackend::Sql {
252 endpoint: format!(
253 "sqlite://{}?mode=rwc",
254 opts.sql_endpoint
255 .expect("sql endpoint is required")
256 .expose_secret()
257 ),
258 config: meta_store_config,
259 },
260 MetaBackend::Postgres => MetaStoreBackend::Sql {
261 endpoint: format!(
262 "postgres://{}:{}@{}/{}",
263 opts.sql_username,
264 opts.sql_password.expose_secret(),
265 opts.sql_endpoint
266 .expect("sql endpoint is required")
267 .expose_secret(),
268 opts.sql_database
269 ),
270 config: meta_store_config,
271 },
272 MetaBackend::Mysql => MetaStoreBackend::Sql {
273 endpoint: format!(
274 "mysql://{}:{}@{}/{}",
275 opts.sql_username,
276 opts.sql_password.expose_secret(),
277 opts.sql_endpoint
278 .expect("sql endpoint is required")
279 .expose_secret(),
280 opts.sql_database
281 ),
282 config: meta_store_config,
283 },
284 };
285 validate_config(&config);
286
287 let total_memory_bytes = resource_util::memory::system_memory_available_bytes();
288 let heap_profiler =
289 HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
290 heap_profiler.start();
292
293 let secret_store_private_key = opts
294 .secret_store_private_key_hex
295 .map(|key| hex::decode(key).unwrap());
296 let max_heartbeat_interval =
297 Duration::from_secs(config.meta.max_heartbeat_interval_secs as u64);
298 let max_idle_ms = config.meta.dangerous_max_idle_secs.unwrap_or(0) * 1000;
299 let in_flight_barrier_nums = config.streaming.in_flight_barrier_nums;
300 let privatelink_endpoint_default_tags =
301 opts.privatelink_endpoint_default_tags.map(|tags| {
302 tags.split(',')
303 .map(|s| {
304 let key_val = s.split_once('=').unwrap();
305 (key_val.0.to_owned(), key_val.1.to_owned())
306 })
307 .collect()
308 });
309
310 let add_info = AddressInfo {
311 advertise_addr: opts.advertise_addr.to_owned(),
312 listen_addr,
313 prometheus_addr,
314 dashboard_addr,
315 };
316
317 const MIN_TIMEOUT_INTERVAL_SEC: u64 = 20;
318 let compaction_task_max_progress_interval_secs = {
319 let retry_config = &config.storage.object_store.retry;
320 let max_streming_read_timeout_ms = (retry_config.streaming_read_attempt_timeout_ms
321 + retry_config.req_backoff_max_delay_ms)
322 * retry_config.streaming_read_retry_attempts as u64;
323 let max_streaming_upload_timeout_ms = (retry_config
324 .streaming_upload_attempt_timeout_ms
325 + retry_config.req_backoff_max_delay_ms)
326 * retry_config.streaming_upload_retry_attempts as u64;
327 let max_upload_timeout_ms = (retry_config.upload_attempt_timeout_ms
328 + retry_config.req_backoff_max_delay_ms)
329 * retry_config.upload_retry_attempts as u64;
330 let max_read_timeout_ms = (retry_config.read_attempt_timeout_ms
331 + retry_config.req_backoff_max_delay_ms)
332 * retry_config.read_retry_attempts as u64;
333 let max_timeout_ms = max_streming_read_timeout_ms
334 .max(max_upload_timeout_ms)
335 .max(max_streaming_upload_timeout_ms)
336 .max(max_read_timeout_ms)
337 .max(config.meta.compaction_task_max_progress_interval_secs * 1000);
338 max_timeout_ms / 1000
339 } + MIN_TIMEOUT_INTERVAL_SEC;
340
341 rpc_serve(
342 add_info,
343 backend,
344 max_heartbeat_interval,
345 config.meta.meta_leader_lease_secs,
346 MetaOpts {
347 enable_recovery: !config.meta.disable_recovery,
348 disable_automatic_parallelism_control: config
349 .meta
350 .disable_automatic_parallelism_control,
351 parallelism_control_batch_size: config.meta.parallelism_control_batch_size,
352 parallelism_control_trigger_period_sec: config
353 .meta
354 .parallelism_control_trigger_period_sec,
355 parallelism_control_trigger_first_delay_sec: config
356 .meta
357 .parallelism_control_trigger_first_delay_sec,
358 in_flight_barrier_nums,
359 max_idle_ms,
360 compaction_deterministic_test: config.meta.enable_compaction_deterministic,
361 default_parallelism: config.meta.default_parallelism,
362 vacuum_interval_sec: config.meta.vacuum_interval_sec,
363 time_travel_vacuum_interval_sec: config
364 .meta
365 .developer
366 .time_travel_vacuum_interval_sec,
367 vacuum_spin_interval_ms: config.meta.vacuum_spin_interval_ms,
368 hummock_version_checkpoint_interval_sec: config
369 .meta
370 .hummock_version_checkpoint_interval_sec,
371 enable_hummock_data_archive: config.meta.enable_hummock_data_archive,
372 hummock_time_travel_snapshot_interval: config
373 .meta
374 .hummock_time_travel_snapshot_interval,
375 hummock_time_travel_sst_info_fetch_batch_size: config
376 .meta
377 .developer
378 .hummock_time_travel_sst_info_fetch_batch_size,
379 hummock_time_travel_sst_info_insert_batch_size: config
380 .meta
381 .developer
382 .hummock_time_travel_sst_info_insert_batch_size,
383 hummock_time_travel_epoch_version_insert_batch_size: config
384 .meta
385 .developer
386 .hummock_time_travel_epoch_version_insert_batch_size,
387 hummock_gc_history_insert_batch_size: config
388 .meta
389 .developer
390 .hummock_gc_history_insert_batch_size,
391 hummock_time_travel_filter_out_objects_batch_size: config
392 .meta
393 .developer
394 .hummock_time_travel_filter_out_objects_batch_size,
395 hummock_time_travel_filter_out_objects_v1: config
396 .meta
397 .developer
398 .hummock_time_travel_filter_out_objects_v1,
399 hummock_time_travel_filter_out_objects_list_version_batch_size: config
400 .meta
401 .developer
402 .hummock_time_travel_filter_out_objects_list_version_batch_size,
403 hummock_time_travel_filter_out_objects_list_delta_batch_size: config
404 .meta
405 .developer
406 .hummock_time_travel_filter_out_objects_list_delta_batch_size,
407 min_delta_log_num_for_hummock_version_checkpoint: config
408 .meta
409 .min_delta_log_num_for_hummock_version_checkpoint,
410 min_sst_retention_time_sec: config.meta.min_sst_retention_time_sec,
411 full_gc_interval_sec: config.meta.full_gc_interval_sec,
412 full_gc_object_limit: config.meta.full_gc_object_limit,
413 gc_history_retention_time_sec: config.meta.gc_history_retention_time_sec,
414 max_inflight_time_travel_query: config.meta.max_inflight_time_travel_query,
415 enable_committed_sst_sanity_check: config.meta.enable_committed_sst_sanity_check,
416 periodic_compaction_interval_sec: config.meta.periodic_compaction_interval_sec,
417 node_num_monitor_interval_sec: config.meta.node_num_monitor_interval_sec,
418 protect_drop_table_with_incoming_sink: config
419 .meta
420 .protect_drop_table_with_incoming_sink,
421 prometheus_endpoint: opts.prometheus_endpoint,
422 prometheus_selector: opts.prometheus_selector,
423 vpc_id: opts.vpc_id,
424 security_group_id: opts.security_group_id,
425 privatelink_endpoint_default_tags,
426 periodic_space_reclaim_compaction_interval_sec: config
427 .meta
428 .periodic_space_reclaim_compaction_interval_sec,
429 telemetry_enabled: config.server.telemetry_enabled,
430 periodic_ttl_reclaim_compaction_interval_sec: config
431 .meta
432 .periodic_ttl_reclaim_compaction_interval_sec,
433 periodic_tombstone_reclaim_compaction_interval_sec: config
434 .meta
435 .periodic_tombstone_reclaim_compaction_interval_sec,
436 periodic_scheduling_compaction_group_split_interval_sec: config
437 .meta
438 .periodic_scheduling_compaction_group_split_interval_sec,
439 periodic_scheduling_compaction_group_merge_interval_sec: config
440 .meta
441 .periodic_scheduling_compaction_group_merge_interval_sec,
442 compaction_group_merge_dimension_threshold: config
443 .meta
444 .compaction_group_merge_dimension_threshold,
445 table_high_write_throughput_threshold: config
446 .meta
447 .table_high_write_throughput_threshold,
448 table_low_write_throughput_threshold: config
449 .meta
450 .table_low_write_throughput_threshold,
451 partition_vnode_count: config.meta.partition_vnode_count,
452 compact_task_table_size_partition_threshold_low: config
453 .meta
454 .compact_task_table_size_partition_threshold_low,
455 compact_task_table_size_partition_threshold_high: config
456 .meta
457 .compact_task_table_size_partition_threshold_high,
458 do_not_config_object_storage_lifecycle: config
459 .meta
460 .do_not_config_object_storage_lifecycle,
461 compaction_task_max_heartbeat_interval_secs: config
462 .meta
463 .compaction_task_max_heartbeat_interval_secs,
464 compaction_task_max_progress_interval_secs,
465 compaction_config: Some(config.meta.compaction_config),
466 hybrid_partition_node_count: config.meta.hybrid_partition_vnode_count,
467 event_log_enabled: config.meta.event_log_enabled,
468 event_log_channel_max_size: config.meta.event_log_channel_max_size,
469 advertise_addr: opts.advertise_addr,
470 cached_traces_num: config.meta.developer.cached_traces_num,
471 cached_traces_memory_limit_bytes: config
472 .meta
473 .developer
474 .cached_traces_memory_limit_bytes,
475 enable_trivial_move: config.meta.developer.enable_trivial_move,
476 enable_check_task_level_overlap: config
477 .meta
478 .developer
479 .enable_check_task_level_overlap,
480 enable_dropped_column_reclaim: config.meta.enable_dropped_column_reclaim,
481 split_group_size_ratio: config.meta.split_group_size_ratio,
482 table_stat_high_write_throughput_ratio_for_split: config
483 .meta
484 .table_stat_high_write_throughput_ratio_for_split,
485 table_stat_low_write_throughput_ratio_for_merge: config
486 .meta
487 .table_stat_low_write_throughput_ratio_for_merge,
488 table_stat_throuput_window_seconds_for_split: config
489 .meta
490 .table_stat_throuput_window_seconds_for_split,
491 table_stat_throuput_window_seconds_for_merge: config
492 .meta
493 .table_stat_throuput_window_seconds_for_merge,
494 object_store_config: config.storage.object_store,
495 max_trivial_move_task_count_per_loop: config
496 .meta
497 .developer
498 .max_trivial_move_task_count_per_loop,
499 max_get_task_probe_times: config.meta.developer.max_get_task_probe_times,
500 secret_store_private_key,
501 temp_secret_file_dir: opts.temp_secret_file_dir,
502 actor_cnt_per_worker_parallelism_hard_limit: config
503 .meta
504 .developer
505 .actor_cnt_per_worker_parallelism_hard_limit,
506 actor_cnt_per_worker_parallelism_soft_limit: config
507 .meta
508 .developer
509 .actor_cnt_per_worker_parallelism_soft_limit,
510 license_key_path: opts.license_key_path,
511 compute_client_config: config.meta.developer.compute_client_config.clone(),
512 stream_client_config: config.meta.developer.stream_client_config.clone(),
513 frontend_client_config: config.meta.developer.frontend_client_config.clone(),
514 redact_sql_option_keywords: Arc::new(
515 config
516 .batch
517 .redact_sql_option_keywords
518 .into_iter()
519 .collect(),
520 ),
521 },
522 config.system.into_init_system_params(),
523 Default::default(),
524 shutdown,
525 )
526 .await
527 .unwrap();
528 })
529}
530
531fn validate_config(config: &RwConfig) {
532 if config.meta.meta_leader_lease_secs <= 2 {
533 let error_msg = "meta leader lease secs should be larger than 2";
534 tracing::error!(error_msg);
535 panic!("{}", error_msg);
536 }
537
538 if config.meta.parallelism_control_batch_size == 0 {
539 let error_msg = "parallelism control batch size should be larger than 0";
540 tracing::error!(error_msg);
541 panic!("{}", error_msg);
542 }
543}