1use std::ops::Deref;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::sync::atomic::AtomicU32;
19
20use anyhow::Context;
21use risingwave_common::config::{
22 CompactionConfig, DefaultParallelism, ObjectStoreConfig, RpcClientConfig, SessionInitConfig,
23};
24use risingwave_common::system_param::reader::SystemParamsReader;
25use risingwave_common::{bail, system_param};
26use risingwave_meta_model::prelude::Cluster;
27use risingwave_pb::meta::SystemParams;
28use risingwave_rpc_client::{
29 FrontendClientPool, FrontendClientPoolRef, StreamClientPool, StreamClientPoolRef,
30};
31use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
32use sea_orm::EntityTrait;
33
34use crate::MetaResult;
35use crate::barrier::SharedActorInfos;
36use crate::controller::SqlMetaStore;
37use crate::controller::id::{
38 IdGeneratorManager as SqlIdGeneratorManager, IdGeneratorManagerRef as SqlIdGeneratorManagerRef,
39};
40use crate::controller::session_params::{SessionParamsController, SessionParamsControllerRef};
41use crate::controller::system_param::{SystemParamsController, SystemParamsControllerRef};
42use crate::hummock::sequence::SequenceGenerator;
43use crate::manager::event_log::{EventLogManagerRef, start_event_log_manager};
44use crate::manager::{IdleManager, IdleManagerRef, NotificationManager, NotificationManagerRef};
45use crate::model::ClusterId;
46
47#[derive(Clone)]
50pub struct MetaSrvEnv {
51 id_gen_manager_impl: SqlIdGeneratorManagerRef,
53
54 system_param_manager_impl: SystemParamsControllerRef,
56
57 session_param_manager_impl: SessionParamsControllerRef,
59
60 meta_store_impl: SqlMetaStore,
62
63 notification_manager: NotificationManagerRef,
65
66 pub shared_actor_info: SharedActorInfos,
67
68 stream_client_pool: StreamClientPoolRef,
70
71 frontend_client_pool: FrontendClientPoolRef,
73
74 idle_manager: IdleManagerRef,
76
77 event_log_manager: EventLogManagerRef,
78
79 cluster_id: ClusterId,
81
82 pub hummock_seq: Arc<SequenceGenerator>,
83
84 await_tree_reg: await_tree::Registry,
86
87 pub opts: Arc<MetaOpts>,
89
90 actor_id_generator: Arc<AtomicU32>,
91}
92
93#[derive(Clone, serde::Serialize)]
95pub struct MetaOpts {
96 pub enable_recovery: bool,
99 pub disable_automatic_parallelism_control: bool,
101 pub parallelism_control_batch_size: usize,
103 pub parallelism_control_trigger_period_sec: u64,
105 pub parallelism_control_trigger_first_delay_sec: u64,
107 pub in_flight_barrier_nums: usize,
109 pub max_idle_ms: u64,
112 pub compaction_deterministic_test: bool,
114 pub default_parallelism: DefaultParallelism,
116
117 pub vacuum_interval_sec: u64,
120 pub vacuum_spin_interval_ms: u64,
123 pub iceberg_gc_interval_sec: u64,
125 pub iceberg_compaction_report_timeout_sec: u64,
127 pub iceberg_compaction_config_refresh_interval_sec: u64,
129 pub time_travel_vacuum_interval_sec: u64,
130 pub time_travel_vacuum_max_version_count: Option<u32>,
131 pub hummock_version_checkpoint_interval_sec: u64,
133 pub enable_hummock_data_archive: bool,
134 pub checkpoint_compression_algorithm: risingwave_common::config::CheckpointCompression,
136 pub checkpoint_read_chunk_size: usize,
138 pub checkpoint_read_max_in_flight_chunks: usize,
140 pub hummock_time_travel_snapshot_interval: u64,
141 pub hummock_time_travel_sst_info_fetch_batch_size: usize,
142 pub hummock_time_travel_sst_info_insert_batch_size: usize,
143 pub hummock_time_travel_epoch_version_insert_batch_size: usize,
144 pub hummock_time_travel_delta_fetch_batch_size: usize,
145 pub hummock_gc_history_insert_batch_size: usize,
146 pub hummock_time_travel_filter_out_objects_batch_size: usize,
147 pub hummock_time_travel_filter_out_objects_v1: bool,
148 pub hummock_time_travel_filter_out_objects_list_version_batch_size: usize,
149 pub hummock_time_travel_filter_out_objects_list_delta_batch_size: usize,
150 pub min_delta_log_num_for_hummock_version_checkpoint: u64,
155 pub min_sst_retention_time_sec: u64,
158 pub full_gc_interval_sec: u64,
160 pub full_gc_object_limit: u64,
162 pub gc_history_retention_time_sec: u64,
164 pub max_inflight_time_travel_query: u64,
166 pub enable_committed_sst_sanity_check: bool,
168 pub periodic_compaction_interval_sec: u64,
170 pub node_num_monitor_interval_sec: u64,
172 pub protect_drop_table_with_incoming_sink: bool,
174 pub prometheus_endpoint: Option<String>,
180
181 pub prometheus_selector: Option<String>,
183
184 pub vpc_id: Option<String>,
186
187 pub security_group_id: Option<String>,
189
190 pub privatelink_endpoint_default_tags: Option<Vec<(String, String)>>,
194
195 pub periodic_space_reclaim_compaction_interval_sec: u64,
197
198 pub telemetry_enabled: bool,
200 pub periodic_ttl_reclaim_compaction_interval_sec: u64,
202
203 pub periodic_tombstone_reclaim_compaction_interval_sec: u64,
205
206 pub periodic_scheduling_compaction_group_split_interval_sec: u64,
208 pub enable_compaction_group_normalize: bool,
210 pub max_normalize_splits_per_round: u64,
212
213 pub do_not_config_object_storage_lifecycle: bool,
215
216 pub partition_vnode_count: u32,
217
218 pub table_high_write_throughput_threshold: u64,
220 pub table_low_write_throughput_threshold: u64,
222
223 pub compaction_task_max_heartbeat_interval_secs: u64,
224 pub compaction_task_max_progress_interval_secs: u64,
225 pub compaction_task_id_refill_capacity: u32,
226 pub compaction_config: Option<CompactionConfig>,
227
228 pub hybrid_partition_node_count: u32,
236
237 pub event_log_enabled: bool,
238 pub event_log_channel_max_size: u32,
239 pub advertise_addr: String,
240 pub cached_traces_num: u32,
243 pub cached_traces_memory_limit_bytes: usize,
246
247 pub enable_trivial_move: bool,
249
250 pub enable_check_task_level_overlap: bool,
252 pub enable_dropped_column_reclaim: bool,
253
254 pub split_group_size_ratio: f64,
256
257 pub refresh_scheduler_interval_sec: u64,
259
260 pub table_stat_high_write_throughput_ratio_for_split: f64,
262
263 pub table_stat_low_write_throughput_ratio_for_merge: f64,
265
266 pub table_stat_throuput_window_seconds_for_split: usize,
268
269 pub table_stat_throuput_window_seconds_for_merge: usize,
271
272 pub object_store_config: ObjectStoreConfig,
274
275 pub max_trivial_move_task_count_per_loop: usize,
277
278 pub max_get_task_probe_times: usize,
280
281 pub compact_task_table_size_partition_threshold_low: u64,
282 pub compact_task_table_size_partition_threshold_high: u64,
283
284 pub periodic_scheduling_compaction_group_merge_interval_sec: u64,
285
286 pub compaction_group_merge_dimension_threshold: f64,
287
288 pub secret_store_private_key: Option<Vec<u8>>,
290 pub temp_secret_file_dir: String,
292
293 pub actor_cnt_per_worker_parallelism_hard_limit: usize,
295 pub actor_cnt_per_worker_parallelism_soft_limit: usize,
296
297 pub table_change_log_insert_batch_size: u64,
298 pub table_change_log_delete_batch_size: u64,
299
300 pub license_key_path: Option<PathBuf>,
301
302 pub compute_client_config: RpcClientConfig,
303 pub stream_client_config: RpcClientConfig,
304 pub frontend_client_config: RpcClientConfig,
305 pub redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
306
307 pub cdc_table_split_init_sleep_interval_splits: u64,
308 pub cdc_table_split_init_sleep_duration_millis: u64,
309 pub cdc_table_split_init_insert_batch_size: u64,
310
311 pub enable_legacy_table_migration: bool,
312 pub pause_on_next_bootstrap_offline: bool,
313}
314
315impl MetaOpts {
316 pub fn test(enable_recovery: bool) -> Self {
318 Self {
319 enable_recovery,
320 disable_automatic_parallelism_control: false,
321 parallelism_control_batch_size: 1,
322 parallelism_control_trigger_period_sec: 10,
323 parallelism_control_trigger_first_delay_sec: 30,
324 in_flight_barrier_nums: 40,
325 max_idle_ms: 0,
326 compaction_deterministic_test: false,
327 default_parallelism: DefaultParallelism::Full,
328 vacuum_interval_sec: 30,
329 time_travel_vacuum_interval_sec: 30,
330 time_travel_vacuum_max_version_count: None,
331 vacuum_spin_interval_ms: 0,
332 iceberg_gc_interval_sec: 3600,
333 iceberg_compaction_report_timeout_sec: 30 * 60,
334 iceberg_compaction_config_refresh_interval_sec: 60,
335 hummock_version_checkpoint_interval_sec: 30,
336 enable_hummock_data_archive: false,
337 checkpoint_compression_algorithm:
338 risingwave_common::config::CheckpointCompression::Zstd,
339 checkpoint_read_chunk_size: 128 * 1024 * 1024,
340 checkpoint_read_max_in_flight_chunks: 4,
341 hummock_time_travel_snapshot_interval: 0,
342 hummock_time_travel_sst_info_fetch_batch_size: 10_000,
343 hummock_time_travel_sst_info_insert_batch_size: 10,
344 hummock_time_travel_epoch_version_insert_batch_size: 1000,
345 hummock_time_travel_delta_fetch_batch_size: 1000,
346 hummock_gc_history_insert_batch_size: 1000,
347 hummock_time_travel_filter_out_objects_batch_size: 1000,
348 hummock_time_travel_filter_out_objects_v1: false,
349 hummock_time_travel_filter_out_objects_list_version_batch_size: 10,
350 hummock_time_travel_filter_out_objects_list_delta_batch_size: 1000,
351 min_delta_log_num_for_hummock_version_checkpoint: 1,
352 min_sst_retention_time_sec: 3600 * 24 * 7,
353 full_gc_interval_sec: 3600 * 24 * 7,
354 full_gc_object_limit: 100_000,
355 gc_history_retention_time_sec: 3600 * 24 * 7,
356 max_inflight_time_travel_query: 1000,
357 enable_committed_sst_sanity_check: false,
358 periodic_compaction_interval_sec: 300,
359 node_num_monitor_interval_sec: 10,
360 protect_drop_table_with_incoming_sink: false,
361 prometheus_endpoint: None,
362 prometheus_selector: None,
363 vpc_id: None,
364 security_group_id: None,
365 privatelink_endpoint_default_tags: None,
366 periodic_space_reclaim_compaction_interval_sec: 60,
367 telemetry_enabled: false,
368 periodic_ttl_reclaim_compaction_interval_sec: 60,
369 periodic_tombstone_reclaim_compaction_interval_sec: 60,
370 periodic_scheduling_compaction_group_split_interval_sec: 60,
371 enable_compaction_group_normalize: false,
372 max_normalize_splits_per_round: 4,
373 compact_task_table_size_partition_threshold_low: 128 * 1024 * 1024,
374 compact_task_table_size_partition_threshold_high: 512 * 1024 * 1024,
375 table_high_write_throughput_threshold: 128 * 1024 * 1024,
376 table_low_write_throughput_threshold: 64 * 1024 * 1024,
377 do_not_config_object_storage_lifecycle: true,
378 partition_vnode_count: 32,
379 compaction_task_max_heartbeat_interval_secs: 0,
380 compaction_task_max_progress_interval_secs: 1,
381 compaction_task_id_refill_capacity: 64,
382 compaction_config: None,
383 hybrid_partition_node_count: 4,
384 event_log_enabled: false,
385 event_log_channel_max_size: 1,
386 advertise_addr: "".to_owned(),
387 cached_traces_num: 1,
388 cached_traces_memory_limit_bytes: usize::MAX,
389 enable_trivial_move: true,
390 enable_check_task_level_overlap: true,
391 enable_dropped_column_reclaim: false,
392 object_store_config: ObjectStoreConfig::default(),
393 max_trivial_move_task_count_per_loop: 256,
394 max_get_task_probe_times: 5,
395 secret_store_private_key: Some(
396 hex::decode("0123456789abcdef0123456789abcdef").unwrap(),
397 ),
398 temp_secret_file_dir: "./secrets".to_owned(),
399 actor_cnt_per_worker_parallelism_hard_limit: usize::MAX,
400 actor_cnt_per_worker_parallelism_soft_limit: usize::MAX,
401 split_group_size_ratio: 0.9,
402 table_stat_high_write_throughput_ratio_for_split: 0.5,
403 table_stat_low_write_throughput_ratio_for_merge: 0.7,
404 table_stat_throuput_window_seconds_for_split: 60,
405 table_stat_throuput_window_seconds_for_merge: 240,
406 periodic_scheduling_compaction_group_merge_interval_sec: 60 * 10,
407 compaction_group_merge_dimension_threshold: 1.2,
408 license_key_path: None,
409 compute_client_config: RpcClientConfig::default(),
410 stream_client_config: RpcClientConfig::default(),
411 frontend_client_config: RpcClientConfig::default(),
412 redact_sql_option_keywords: Arc::new(Default::default()),
413 cdc_table_split_init_sleep_interval_splits: 1000,
414 cdc_table_split_init_sleep_duration_millis: 10,
415 cdc_table_split_init_insert_batch_size: 1000,
416 enable_legacy_table_migration: true,
417 refresh_scheduler_interval_sec: 60,
418 pause_on_next_bootstrap_offline: false,
419 table_change_log_insert_batch_size: 1000,
420 table_change_log_delete_batch_size: 1000,
421 }
422 }
423}
424
425impl MetaSrvEnv {
426 pub async fn new(
427 opts: MetaOpts,
428 mut init_system_params: SystemParams,
429 session_init: SessionInitConfig,
430 meta_store_impl: SqlMetaStore,
431 ) -> MetaResult<Self> {
432 let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
433 let stream_client_pool =
434 Arc::new(StreamClientPool::new(1, opts.stream_client_config.clone())); let frontend_client_pool = Arc::new(FrontendClientPool::new(
436 1,
437 opts.frontend_client_config.clone(),
438 ));
439 let event_log_manager = Arc::new(start_event_log_manager(
440 opts.event_log_enabled,
441 opts.event_log_channel_max_size,
442 ));
443
444 if opts.license_key_path.is_some()
447 && init_system_params.license_key
448 != system_param::default::license_key_opt().map(Into::into)
449 {
450 bail!(
451 "argument `--license-key-path` (or env var `RW_LICENSE_KEY_PATH`) and \
452 system parameter `license_key` (or env var `RW_LICENSE_KEY`) may not \
453 be set at the same time"
454 );
455 }
456
457 let cluster_first_launch = meta_store_impl.up().await.context(
458 "Failed to initialize the meta store, \
459 this may happen if there's existing metadata incompatible with the current version of RisingWave, \
460 e.g., downgrading from a newer release or a nightly build to an older one. \
461 For a single-node deployment, you may want to reset all data by deleting the data directory, \
462 typically located at `~/.risingwave`.",
463 )?;
464
465 let notification_manager =
466 Arc::new(NotificationManager::new(meta_store_impl.clone()).await);
467 let cluster_id = Cluster::find()
468 .one(&meta_store_impl.conn)
469 .await?
470 .map(|c| c.cluster_id.to_string().into())
471 .unwrap();
472
473 init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch);
479
480 let system_param_controller = Arc::new(
481 SystemParamsController::new(
482 meta_store_impl.clone(),
483 notification_manager.clone(),
484 init_system_params,
485 )
486 .await?,
487 );
488 let session_param_controller = Arc::new(
489 SessionParamsController::new(
490 meta_store_impl.clone(),
491 notification_manager.clone(),
492 session_init,
493 )
494 .await?,
495 );
496 Ok(Self {
497 id_gen_manager_impl: Arc::new(SqlIdGeneratorManager::new(&meta_store_impl.conn).await?),
498 system_param_manager_impl: system_param_controller,
499 session_param_manager_impl: session_param_controller,
500 meta_store_impl: meta_store_impl.clone(),
501 shared_actor_info: SharedActorInfos::new(notification_manager.clone()),
502 notification_manager,
503 stream_client_pool,
504 frontend_client_pool,
505 idle_manager,
506 event_log_manager,
507 cluster_id,
508 hummock_seq: Arc::new(SequenceGenerator::new(meta_store_impl.conn.clone())),
509 opts: opts.into(),
510 await_tree_reg: await_tree::Registry::new(Default::default()),
512 actor_id_generator: Arc::new(AtomicU32::new(0)),
513 })
514 }
515
516 pub fn meta_store(&self) -> SqlMetaStore {
517 self.meta_store_impl.clone()
518 }
519
520 pub fn meta_store_ref(&self) -> &SqlMetaStore {
521 &self.meta_store_impl
522 }
523
524 pub fn id_gen_manager(&self) -> &SqlIdGeneratorManagerRef {
525 &self.id_gen_manager_impl
526 }
527
528 pub fn notification_manager_ref(&self) -> NotificationManagerRef {
529 self.notification_manager.clone()
530 }
531
532 pub fn notification_manager(&self) -> &NotificationManager {
533 self.notification_manager.deref()
534 }
535
536 pub fn idle_manager_ref(&self) -> IdleManagerRef {
537 self.idle_manager.clone()
538 }
539
540 pub fn idle_manager(&self) -> &IdleManager {
541 self.idle_manager.deref()
542 }
543
544 pub fn actor_id_generator(&self) -> &AtomicU32 {
545 self.actor_id_generator.deref()
546 }
547
548 pub async fn system_params_reader(&self) -> SystemParamsReader {
549 self.system_param_manager_impl.get_params().await
550 }
551
552 pub fn system_params_manager_impl_ref(&self) -> SystemParamsControllerRef {
553 self.system_param_manager_impl.clone()
554 }
555
556 pub fn session_params_manager_impl_ref(&self) -> SessionParamsControllerRef {
557 self.session_param_manager_impl.clone()
558 }
559
560 pub fn stream_client_pool_ref(&self) -> StreamClientPoolRef {
561 self.stream_client_pool.clone()
562 }
563
564 pub fn stream_client_pool(&self) -> &StreamClientPool {
565 self.stream_client_pool.deref()
566 }
567
568 pub fn frontend_client_pool(&self) -> &FrontendClientPool {
569 self.frontend_client_pool.deref()
570 }
571
572 pub fn cluster_id(&self) -> &ClusterId {
573 &self.cluster_id
574 }
575
576 pub fn event_log_manager_ref(&self) -> EventLogManagerRef {
577 self.event_log_manager.clone()
578 }
579
580 pub fn await_tree_reg(&self) -> &await_tree::Registry {
581 &self.await_tree_reg
582 }
583
584 pub fn shared_actor_infos(&self) -> &SharedActorInfos {
585 &self.shared_actor_info
586 }
587}
588
589#[cfg(any(test, feature = "test"))]
590impl MetaSrvEnv {
591 pub async fn for_test() -> Self {
593 Self::for_test_opts(MetaOpts::test(false), |_| ()).await
594 }
595
596 pub async fn for_test_opts(
597 opts: MetaOpts,
598 on_test_system_params: impl FnOnce(&mut risingwave_pb::meta::PbSystemParams),
599 ) -> Self {
600 let mut system_params = risingwave_common::system_param::system_params_for_test();
601 on_test_system_params(&mut system_params);
602 Self::new(
603 opts,
604 system_params,
605 Default::default(),
606 SqlMetaStore::for_test().await,
607 )
608 .await
609 .unwrap()
610 }
611}