1#![feature(let_chains)]
16#![cfg_attr(coverage, feature(coverage_attribute))]
17
18mod server;
19
20use std::path::PathBuf;
21use std::time::Duration;
22
23use clap::Parser;
24use educe::Educe;
25pub use error::{MetaError, MetaResult};
26use redact::Secret;
27use risingwave_common::config::OverrideConfig;
28use risingwave_common::license::LicenseKey;
29use risingwave_common::util::meta_addr::MetaAddressStrategy;
30use risingwave_common::util::resource_util;
31use risingwave_common::util::tokio_util::sync::CancellationToken;
32use risingwave_common::{GIT_SHA, RW_VERSION};
33use risingwave_common_heap_profiling::HeapProfiler;
34use risingwave_meta::*;
35use risingwave_meta_service::*;
36pub use rpc::{ElectionClient, ElectionMember};
37use server::rpc_serve;
38pub use server::started::get as is_server_started;
39
40use crate::manager::MetaOpts;
41
42#[derive(Educe, Clone, Parser, OverrideConfig)]
43#[educe(Debug)]
44#[command(version, about = "The central metadata management service")]
45pub struct MetaNodeOpts {
46 #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5690")]
48 pub listen_addr: String,
49
50 #[clap(long, env = "RW_ADVERTISE_ADDR", default_value = "127.0.0.1:5690")]
56 pub advertise_addr: String,
57
58 #[clap(long, env = "RW_DASHBOARD_HOST")]
59 pub dashboard_host: Option<String>,
60
61 #[clap(long, env = "RW_PROMETHEUS_HOST", alias = "prometheus-host")]
64 pub prometheus_listener_addr: Option<String>,
65
66 #[clap(long, hide = true, env = "RW_SQL_ENDPOINT")]
68 pub sql_endpoint: Option<Secret<String>>,
69
70 #[clap(long, hide = true, env = "RW_SQL_USERNAME", default_value = "")]
72 pub sql_username: String,
73
74 #[clap(long, hide = true, env = "RW_SQL_PASSWORD", default_value = "")]
76 pub sql_password: Secret<String>,
77
78 #[clap(long, hide = true, env = "RW_SQL_DATABASE", default_value = "")]
80 pub sql_database: String,
81
82 #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")]
86 pub prometheus_endpoint: Option<String>,
87
88 #[clap(long, env = "RW_PROMETHEUS_SELECTOR")]
92 pub prometheus_selector: Option<String>,
93
94 #[clap(long, hide = true, env = "RW_PRIVATELINK_ENDPOINT_DEFAULT_TAGS")]
98 pub privatelink_endpoint_default_tags: Option<String>,
99
100 #[clap(long, hide = true, env = "RW_VPC_ID")]
101 pub vpc_id: Option<String>,
102
103 #[clap(long, hide = true, env = "RW_VPC_SECURITY_GROUP_ID")]
104 pub security_group_id: Option<String>,
105
106 #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
110 pub config_path: String,
111
112 #[clap(long, hide = true, env = "RW_BACKEND", value_enum)]
113 #[override_opts(path = meta.backend)]
114 pub backend: Option<MetaBackend>,
115
116 #[clap(long, hide = true, env = "RW_BARRIER_INTERVAL_MS")]
118 #[override_opts(path = system.barrier_interval_ms)]
119 pub barrier_interval_ms: Option<u32>,
120
121 #[clap(long, hide = true, env = "RW_SSTABLE_SIZE_MB")]
123 #[override_opts(path = system.sstable_size_mb)]
124 pub sstable_size_mb: Option<u32>,
125
126 #[clap(long, hide = true, env = "RW_BLOCK_SIZE_KB")]
128 #[override_opts(path = system.block_size_kb)]
129 pub block_size_kb: Option<u32>,
130
131 #[clap(long, hide = true, env = "RW_BLOOM_FALSE_POSITIVE")]
133 #[override_opts(path = system.bloom_false_positive)]
134 pub bloom_false_positive: Option<f64>,
135
136 #[clap(long, hide = true, env = "RW_STATE_STORE")]
138 #[override_opts(path = system.state_store)]
139 pub state_store: Option<String>,
140
141 #[clap(long, hide = true, env = "RW_DATA_DIRECTORY")]
143 #[override_opts(path = system.data_directory)]
144 pub data_directory: Option<String>,
145
146 #[clap(long, hide = true, env = "RW_DO_NOT_CONFIG_BUCKET_LIFECYCLE")]
148 #[override_opts(path = meta.do_not_config_object_storage_lifecycle)]
149 pub do_not_config_object_storage_lifecycle: Option<bool>,
150
151 #[clap(long, hide = true, env = "RW_BACKUP_STORAGE_URL")]
153 #[override_opts(path = system.backup_storage_url)]
154 pub backup_storage_url: Option<String>,
155
156 #[clap(long, hide = true, env = "RW_BACKUP_STORAGE_DIRECTORY")]
158 #[override_opts(path = system.backup_storage_directory)]
159 pub backup_storage_directory: Option<String>,
160
161 #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
163 #[override_opts(path = server.heap_profiling.dir)]
164 pub heap_profiling_dir: Option<String>,
165
166 #[clap(long, hide = true, env = "RW_DANGEROUS_MAX_IDLE_SECS")]
168 #[override_opts(path = meta.dangerous_max_idle_secs)]
169 pub dangerous_max_idle_secs: Option<u64>,
170
171 #[deprecated = "connector node has been deprecated."]
173 #[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")]
174 pub connector_rpc_endpoint: Option<String>,
175
176 #[clap(long, hide = true, env = "RW_LICENSE_KEY")]
178 #[override_opts(path = system.license_key)]
179 pub license_key: Option<LicenseKey>,
180
181 #[clap(long, env = "RW_LICENSE_KEY_PATH")]
183 pub license_key_path: Option<PathBuf>,
184
185 #[educe(Debug(ignore))] #[clap(long, hide = true, env = "RW_SECRET_STORE_PRIVATE_KEY_HEX")]
188 pub secret_store_private_key_hex: Option<String>,
189
190 #[clap(
192 long,
193 hide = true,
194 env = "RW_TEMP_SECRET_FILE_DIR",
195 default_value = "./secrets"
196 )]
197 pub temp_secret_file_dir: String,
198}
199
200impl risingwave_common::opts::Opts for MetaNodeOpts {
201 fn name() -> &'static str {
202 "meta"
203 }
204
205 fn meta_addr(&self) -> MetaAddressStrategy {
206 format!("http://{}", self.listen_addr)
207 .parse()
208 .expect("invalid listen address")
209 }
210}
211
212use std::future::Future;
213use std::pin::Pin;
214
215use risingwave_common::config::{MetaBackend, RwConfig, load_config};
216use tracing::info;
217
218pub fn start(
220 opts: MetaNodeOpts,
221 shutdown: CancellationToken,
222) -> Pin<Box<dyn Future<Output = ()> + Send>> {
223 Box::pin(async move {
226 info!("Starting meta node");
227 info!("> options: {:?}", opts);
228 let config = load_config(&opts.config_path, &opts);
229 info!("> config: {:?}", config);
230 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
231 let listen_addr = opts.listen_addr.parse().unwrap();
232 let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap());
233 let prometheus_addr = opts.prometheus_listener_addr.map(|x| x.parse().unwrap());
234 let meta_store_config = config.meta.meta_store_config.clone();
235 let backend = match config.meta.backend {
236 MetaBackend::Mem => MetaStoreBackend::Mem,
237 MetaBackend::Sql => MetaStoreBackend::Sql {
238 endpoint: opts
239 .sql_endpoint
240 .expect("sql endpoint is required")
241 .expose_secret()
242 .to_string(),
243 config: meta_store_config,
244 },
245 MetaBackend::Sqlite => MetaStoreBackend::Sql {
246 endpoint: format!(
247 "sqlite://{}?mode=rwc",
248 opts.sql_endpoint
249 .expect("sql endpoint is required")
250 .expose_secret()
251 ),
252 config: meta_store_config,
253 },
254 MetaBackend::Postgres => MetaStoreBackend::Sql {
255 endpoint: format!(
256 "postgres://{}:{}@{}/{}",
257 opts.sql_username,
258 opts.sql_password.expose_secret(),
259 opts.sql_endpoint
260 .expect("sql endpoint is required")
261 .expose_secret(),
262 opts.sql_database
263 ),
264 config: meta_store_config,
265 },
266 MetaBackend::Mysql => MetaStoreBackend::Sql {
267 endpoint: format!(
268 "mysql://{}:{}@{}/{}",
269 opts.sql_username,
270 opts.sql_password.expose_secret(),
271 opts.sql_endpoint
272 .expect("sql endpoint is required")
273 .expose_secret(),
274 opts.sql_database
275 ),
276 config: meta_store_config,
277 },
278 };
279 validate_config(&config);
280
281 let total_memory_bytes = resource_util::memory::system_memory_available_bytes();
282 let heap_profiler =
283 HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
284 heap_profiler.start();
286
287 let secret_store_private_key = opts
288 .secret_store_private_key_hex
289 .map(|key| hex::decode(key).unwrap());
290 let max_heartbeat_interval =
291 Duration::from_secs(config.meta.max_heartbeat_interval_secs as u64);
292 let max_idle_ms = config.meta.dangerous_max_idle_secs.unwrap_or(0) * 1000;
293 let in_flight_barrier_nums = config.streaming.in_flight_barrier_nums;
294 let privatelink_endpoint_default_tags =
295 opts.privatelink_endpoint_default_tags.map(|tags| {
296 tags.split(',')
297 .map(|s| {
298 let key_val = s.split_once('=').unwrap();
299 (key_val.0.to_owned(), key_val.1.to_owned())
300 })
301 .collect()
302 });
303
304 let add_info = AddressInfo {
305 advertise_addr: opts.advertise_addr.to_owned(),
306 listen_addr,
307 prometheus_addr,
308 dashboard_addr,
309 };
310
311 const MIN_TIMEOUT_INTERVAL_SEC: u64 = 20;
312 let compaction_task_max_progress_interval_secs = {
313 let retry_config = &config.storage.object_store.retry;
314 let max_streming_read_timeout_ms = (retry_config.streaming_read_attempt_timeout_ms
315 + retry_config.req_backoff_max_delay_ms)
316 * retry_config.streaming_read_retry_attempts as u64;
317 let max_streaming_upload_timeout_ms = (retry_config
318 .streaming_upload_attempt_timeout_ms
319 + retry_config.req_backoff_max_delay_ms)
320 * retry_config.streaming_upload_retry_attempts as u64;
321 let max_upload_timeout_ms = (retry_config.upload_attempt_timeout_ms
322 + retry_config.req_backoff_max_delay_ms)
323 * retry_config.upload_retry_attempts as u64;
324 let max_read_timeout_ms = (retry_config.read_attempt_timeout_ms
325 + retry_config.req_backoff_max_delay_ms)
326 * retry_config.read_retry_attempts as u64;
327 let max_timeout_ms = max_streming_read_timeout_ms
328 .max(max_upload_timeout_ms)
329 .max(max_streaming_upload_timeout_ms)
330 .max(max_read_timeout_ms)
331 .max(config.meta.compaction_task_max_progress_interval_secs * 1000);
332 max_timeout_ms / 1000
333 } + MIN_TIMEOUT_INTERVAL_SEC;
334
335 rpc_serve(
336 add_info,
337 backend,
338 max_heartbeat_interval,
339 config.meta.meta_leader_lease_secs,
340 MetaOpts {
341 enable_recovery: !config.meta.disable_recovery,
342 disable_automatic_parallelism_control: config
343 .meta
344 .disable_automatic_parallelism_control,
345 parallelism_control_batch_size: config.meta.parallelism_control_batch_size,
346 parallelism_control_trigger_period_sec: config
347 .meta
348 .parallelism_control_trigger_period_sec,
349 parallelism_control_trigger_first_delay_sec: config
350 .meta
351 .parallelism_control_trigger_first_delay_sec,
352 in_flight_barrier_nums,
353 max_idle_ms,
354 compaction_deterministic_test: config.meta.enable_compaction_deterministic,
355 default_parallelism: config.meta.default_parallelism,
356 vacuum_interval_sec: config.meta.vacuum_interval_sec,
357 time_travel_vacuum_interval_sec: config
358 .meta
359 .developer
360 .time_travel_vacuum_interval_sec,
361 vacuum_spin_interval_ms: config.meta.vacuum_spin_interval_ms,
362 hummock_version_checkpoint_interval_sec: config
363 .meta
364 .hummock_version_checkpoint_interval_sec,
365 enable_hummock_data_archive: config.meta.enable_hummock_data_archive,
366 hummock_time_travel_snapshot_interval: config
367 .meta
368 .hummock_time_travel_snapshot_interval,
369 hummock_time_travel_sst_info_fetch_batch_size: config
370 .meta
371 .developer
372 .hummock_time_travel_sst_info_fetch_batch_size,
373 hummock_time_travel_sst_info_insert_batch_size: config
374 .meta
375 .developer
376 .hummock_time_travel_sst_info_insert_batch_size,
377 hummock_time_travel_epoch_version_insert_batch_size: config
378 .meta
379 .developer
380 .hummock_time_travel_epoch_version_insert_batch_size,
381 hummock_gc_history_insert_batch_size: config
382 .meta
383 .developer
384 .hummock_gc_history_insert_batch_size,
385 hummock_time_travel_filter_out_objects_batch_size: config
386 .meta
387 .developer
388 .hummock_time_travel_filter_out_objects_batch_size,
389 min_delta_log_num_for_hummock_version_checkpoint: config
390 .meta
391 .min_delta_log_num_for_hummock_version_checkpoint,
392 min_sst_retention_time_sec: config.meta.min_sst_retention_time_sec,
393 full_gc_interval_sec: config.meta.full_gc_interval_sec,
394 full_gc_object_limit: config.meta.full_gc_object_limit,
395 gc_history_retention_time_sec: config.meta.gc_history_retention_time_sec,
396 max_inflight_time_travel_query: config.meta.max_inflight_time_travel_query,
397 enable_committed_sst_sanity_check: config.meta.enable_committed_sst_sanity_check,
398 periodic_compaction_interval_sec: config.meta.periodic_compaction_interval_sec,
399 node_num_monitor_interval_sec: config.meta.node_num_monitor_interval_sec,
400 prometheus_endpoint: opts.prometheus_endpoint,
401 prometheus_selector: opts.prometheus_selector,
402 vpc_id: opts.vpc_id,
403 security_group_id: opts.security_group_id,
404 privatelink_endpoint_default_tags,
405 periodic_space_reclaim_compaction_interval_sec: config
406 .meta
407 .periodic_space_reclaim_compaction_interval_sec,
408 telemetry_enabled: config.server.telemetry_enabled,
409 periodic_ttl_reclaim_compaction_interval_sec: config
410 .meta
411 .periodic_ttl_reclaim_compaction_interval_sec,
412 periodic_tombstone_reclaim_compaction_interval_sec: config
413 .meta
414 .periodic_tombstone_reclaim_compaction_interval_sec,
415 periodic_scheduling_compaction_group_split_interval_sec: config
416 .meta
417 .periodic_scheduling_compaction_group_split_interval_sec,
418 periodic_scheduling_compaction_group_merge_interval_sec: config
419 .meta
420 .periodic_scheduling_compaction_group_merge_interval_sec,
421 table_high_write_throughput_threshold: config
422 .meta
423 .table_high_write_throughput_threshold,
424 table_low_write_throughput_threshold: config
425 .meta
426 .table_low_write_throughput_threshold,
427 partition_vnode_count: config.meta.partition_vnode_count,
428 compact_task_table_size_partition_threshold_low: config
429 .meta
430 .compact_task_table_size_partition_threshold_low,
431 compact_task_table_size_partition_threshold_high: config
432 .meta
433 .compact_task_table_size_partition_threshold_high,
434 do_not_config_object_storage_lifecycle: config
435 .meta
436 .do_not_config_object_storage_lifecycle,
437 compaction_task_max_heartbeat_interval_secs: config
438 .meta
439 .compaction_task_max_heartbeat_interval_secs,
440 compaction_task_max_progress_interval_secs,
441 compaction_config: Some(config.meta.compaction_config),
442 hybrid_partition_node_count: config.meta.hybrid_partition_vnode_count,
443 event_log_enabled: config.meta.event_log_enabled,
444 event_log_channel_max_size: config.meta.event_log_channel_max_size,
445 advertise_addr: opts.advertise_addr,
446 cached_traces_num: config.meta.developer.cached_traces_num,
447 cached_traces_memory_limit_bytes: config
448 .meta
449 .developer
450 .cached_traces_memory_limit_bytes,
451 enable_trivial_move: config.meta.developer.enable_trivial_move,
452 enable_check_task_level_overlap: config
453 .meta
454 .developer
455 .enable_check_task_level_overlap,
456 enable_dropped_column_reclaim: config.meta.enable_dropped_column_reclaim,
457 split_group_size_ratio: config.meta.split_group_size_ratio,
458 table_stat_high_write_throughput_ratio_for_split: config
459 .meta
460 .table_stat_high_write_throughput_ratio_for_split,
461 table_stat_low_write_throughput_ratio_for_merge: config
462 .meta
463 .table_stat_low_write_throughput_ratio_for_merge,
464 table_stat_throuput_window_seconds_for_split: config
465 .meta
466 .table_stat_throuput_window_seconds_for_split,
467 table_stat_throuput_window_seconds_for_merge: config
468 .meta
469 .table_stat_throuput_window_seconds_for_merge,
470 object_store_config: config.storage.object_store,
471 max_trivial_move_task_count_per_loop: config
472 .meta
473 .developer
474 .max_trivial_move_task_count_per_loop,
475 max_get_task_probe_times: config.meta.developer.max_get_task_probe_times,
476 secret_store_private_key,
477 temp_secret_file_dir: opts.temp_secret_file_dir,
478 actor_cnt_per_worker_parallelism_hard_limit: config
479 .meta
480 .developer
481 .actor_cnt_per_worker_parallelism_hard_limit,
482 actor_cnt_per_worker_parallelism_soft_limit: config
483 .meta
484 .developer
485 .actor_cnt_per_worker_parallelism_soft_limit,
486 license_key_path: opts.license_key_path,
487 compute_client_config: config.meta.developer.compute_client_config.clone(),
488 stream_client_config: config.meta.developer.stream_client_config.clone(),
489 frontend_client_config: config.meta.developer.frontend_client_config.clone(),
490 },
491 config.system.into_init_system_params(),
492 Default::default(),
493 shutdown,
494 )
495 .await
496 .unwrap();
497 })
498}
499
500fn validate_config(config: &RwConfig) {
501 if config.meta.meta_leader_lease_secs <= 2 {
502 let error_msg = "meta leader lease secs should be larger than 2";
503 tracing::error!(error_msg);
504 panic!("{}", error_msg);
505 }
506
507 if config.meta.parallelism_control_batch_size == 0 {
508 let error_msg = "parallelism control batch size should be larger than 0";
509 tracing::error!(error_msg);
510 panic!("{}", error_msg);
511 }
512}