1use std::ops::Deref;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::sync::atomic::AtomicU32;
19
20use anyhow::Context;
21use risingwave_common::config::{
22 CompactionConfig, DefaultParallelism, ObjectStoreConfig, RpcClientConfig,
23};
24use risingwave_common::session_config::SessionConfig;
25use risingwave_common::system_param::reader::SystemParamsReader;
26use risingwave_common::{bail, system_param};
27use risingwave_meta_model::prelude::Cluster;
28use risingwave_pb::meta::SystemParams;
29use risingwave_rpc_client::{
30 FrontendClientPool, FrontendClientPoolRef, StreamClientPool, StreamClientPoolRef,
31};
32use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
33use sea_orm::EntityTrait;
34
35use crate::MetaResult;
36use crate::barrier::SharedActorInfos;
37use crate::controller::SqlMetaStore;
38use crate::controller::id::{
39 IdGeneratorManager as SqlIdGeneratorManager, IdGeneratorManagerRef as SqlIdGeneratorManagerRef,
40};
41use crate::controller::session_params::{SessionParamsController, SessionParamsControllerRef};
42use crate::controller::system_param::{SystemParamsController, SystemParamsControllerRef};
43use crate::hummock::sequence::SequenceGenerator;
44use crate::manager::event_log::{EventLogManagerRef, start_event_log_manager};
45use crate::manager::{IdleManager, IdleManagerRef, NotificationManager, NotificationManagerRef};
46use crate::model::ClusterId;
47
48#[derive(Clone)]
51pub struct MetaSrvEnv {
52 id_gen_manager_impl: SqlIdGeneratorManagerRef,
54
55 system_param_manager_impl: SystemParamsControllerRef,
57
58 session_param_manager_impl: SessionParamsControllerRef,
60
61 meta_store_impl: SqlMetaStore,
63
64 notification_manager: NotificationManagerRef,
66
67 pub shared_actor_info: SharedActorInfos,
68
69 stream_client_pool: StreamClientPoolRef,
71
72 frontend_client_pool: FrontendClientPoolRef,
74
75 idle_manager: IdleManagerRef,
77
78 event_log_manager: EventLogManagerRef,
79
80 cluster_id: ClusterId,
82
83 pub hummock_seq: Arc<SequenceGenerator>,
84
85 await_tree_reg: await_tree::Registry,
87
88 pub opts: Arc<MetaOpts>,
90
91 actor_id_generator: Arc<AtomicU32>,
92}
93
94#[derive(Clone, serde::Serialize)]
96pub struct MetaOpts {
97 pub enable_recovery: bool,
100 pub disable_automatic_parallelism_control: bool,
102 pub parallelism_control_batch_size: usize,
104 pub parallelism_control_trigger_period_sec: u64,
106 pub parallelism_control_trigger_first_delay_sec: u64,
108 pub in_flight_barrier_nums: usize,
110 pub max_idle_ms: u64,
113 pub compaction_deterministic_test: bool,
115 pub default_parallelism: DefaultParallelism,
117
118 pub vacuum_interval_sec: u64,
121 pub vacuum_spin_interval_ms: u64,
124 pub iceberg_gc_interval_sec: u64,
126 pub iceberg_compaction_report_timeout_sec: u64,
128 pub iceberg_compaction_config_refresh_interval_sec: u64,
130 pub time_travel_vacuum_interval_sec: u64,
131 pub time_travel_vacuum_max_version_count: Option<u32>,
132 pub hummock_version_checkpoint_interval_sec: u64,
134 pub enable_hummock_data_archive: bool,
135 pub checkpoint_compression_algorithm: risingwave_common::config::CheckpointCompression,
137 pub checkpoint_read_chunk_size: usize,
139 pub checkpoint_read_max_in_flight_chunks: usize,
141 pub hummock_time_travel_snapshot_interval: u64,
142 pub hummock_time_travel_sst_info_fetch_batch_size: usize,
143 pub hummock_time_travel_sst_info_insert_batch_size: usize,
144 pub hummock_time_travel_epoch_version_insert_batch_size: usize,
145 pub hummock_time_travel_delta_fetch_batch_size: usize,
146 pub hummock_gc_history_insert_batch_size: usize,
147 pub hummock_time_travel_filter_out_objects_batch_size: usize,
148 pub hummock_time_travel_filter_out_objects_v1: bool,
149 pub hummock_time_travel_filter_out_objects_list_version_batch_size: usize,
150 pub hummock_time_travel_filter_out_objects_list_delta_batch_size: usize,
151 pub min_delta_log_num_for_hummock_version_checkpoint: u64,
156 pub min_sst_retention_time_sec: u64,
159 pub full_gc_interval_sec: u64,
161 pub full_gc_object_limit: u64,
163 pub gc_history_retention_time_sec: u64,
165 pub max_inflight_time_travel_query: u64,
167 pub enable_committed_sst_sanity_check: bool,
169 pub periodic_compaction_interval_sec: u64,
171 pub node_num_monitor_interval_sec: u64,
173 pub protect_drop_table_with_incoming_sink: bool,
175 pub prometheus_endpoint: Option<String>,
181
182 pub prometheus_selector: Option<String>,
184
185 pub vpc_id: Option<String>,
187
188 pub security_group_id: Option<String>,
190
191 pub privatelink_endpoint_default_tags: Option<Vec<(String, String)>>,
195
196 pub periodic_space_reclaim_compaction_interval_sec: u64,
198
199 pub telemetry_enabled: bool,
201 pub periodic_ttl_reclaim_compaction_interval_sec: u64,
203
204 pub periodic_tombstone_reclaim_compaction_interval_sec: u64,
206
207 pub periodic_scheduling_compaction_group_split_interval_sec: u64,
209 pub enable_compaction_group_normalize: bool,
211 pub max_normalize_splits_per_round: u64,
213
214 pub do_not_config_object_storage_lifecycle: bool,
216
217 pub partition_vnode_count: u32,
218
219 pub table_high_write_throughput_threshold: u64,
221 pub table_low_write_throughput_threshold: u64,
223
224 pub compaction_task_max_heartbeat_interval_secs: u64,
225 pub compaction_task_max_progress_interval_secs: u64,
226 pub compaction_task_id_refill_capacity: u32,
227 pub compaction_config: Option<CompactionConfig>,
228
229 pub hybrid_partition_node_count: u32,
237
238 pub event_log_enabled: bool,
239 pub event_log_channel_max_size: u32,
240 pub advertise_addr: String,
241 pub cached_traces_num: u32,
244 pub cached_traces_memory_limit_bytes: usize,
247
248 pub enable_trivial_move: bool,
250
251 pub enable_check_task_level_overlap: bool,
253 pub enable_dropped_column_reclaim: bool,
254
255 pub split_group_size_ratio: f64,
257
258 pub refresh_scheduler_interval_sec: u64,
260
261 pub table_stat_high_write_throughput_ratio_for_split: f64,
263
264 pub table_stat_low_write_throughput_ratio_for_merge: f64,
266
267 pub table_stat_throuput_window_seconds_for_split: usize,
269
270 pub table_stat_throuput_window_seconds_for_merge: usize,
272
273 pub object_store_config: ObjectStoreConfig,
275
276 pub max_trivial_move_task_count_per_loop: usize,
278
279 pub max_get_task_probe_times: usize,
281
282 pub compact_task_table_size_partition_threshold_low: u64,
283 pub compact_task_table_size_partition_threshold_high: u64,
284
285 pub periodic_scheduling_compaction_group_merge_interval_sec: u64,
286
287 pub compaction_group_merge_dimension_threshold: f64,
288
289 pub secret_store_private_key: Option<Vec<u8>>,
291 pub temp_secret_file_dir: String,
293
294 pub actor_cnt_per_worker_parallelism_hard_limit: usize,
296 pub actor_cnt_per_worker_parallelism_soft_limit: usize,
297
298 pub table_change_log_insert_batch_size: u64,
299 pub table_change_log_delete_batch_size: u64,
300
301 pub license_key_path: Option<PathBuf>,
302
303 pub compute_client_config: RpcClientConfig,
304 pub stream_client_config: RpcClientConfig,
305 pub frontend_client_config: RpcClientConfig,
306 pub redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
307
308 pub cdc_table_split_init_sleep_interval_splits: u64,
309 pub cdc_table_split_init_sleep_duration_millis: u64,
310 pub cdc_table_split_init_insert_batch_size: u64,
311
312 pub enable_legacy_table_migration: bool,
313 pub pause_on_next_bootstrap_offline: bool,
314}
315
316impl MetaOpts {
317 pub fn test(enable_recovery: bool) -> Self {
319 Self {
320 enable_recovery,
321 disable_automatic_parallelism_control: false,
322 parallelism_control_batch_size: 1,
323 parallelism_control_trigger_period_sec: 10,
324 parallelism_control_trigger_first_delay_sec: 30,
325 in_flight_barrier_nums: 40,
326 max_idle_ms: 0,
327 compaction_deterministic_test: false,
328 default_parallelism: DefaultParallelism::Full,
329 vacuum_interval_sec: 30,
330 time_travel_vacuum_interval_sec: 30,
331 time_travel_vacuum_max_version_count: None,
332 vacuum_spin_interval_ms: 0,
333 iceberg_gc_interval_sec: 3600,
334 iceberg_compaction_report_timeout_sec: 30 * 60,
335 iceberg_compaction_config_refresh_interval_sec: 60,
336 hummock_version_checkpoint_interval_sec: 30,
337 enable_hummock_data_archive: false,
338 checkpoint_compression_algorithm:
339 risingwave_common::config::CheckpointCompression::Zstd,
340 checkpoint_read_chunk_size: 128 * 1024 * 1024,
341 checkpoint_read_max_in_flight_chunks: 4,
342 hummock_time_travel_snapshot_interval: 0,
343 hummock_time_travel_sst_info_fetch_batch_size: 10_000,
344 hummock_time_travel_sst_info_insert_batch_size: 10,
345 hummock_time_travel_epoch_version_insert_batch_size: 1000,
346 hummock_time_travel_delta_fetch_batch_size: 1000,
347 hummock_gc_history_insert_batch_size: 1000,
348 hummock_time_travel_filter_out_objects_batch_size: 1000,
349 hummock_time_travel_filter_out_objects_v1: false,
350 hummock_time_travel_filter_out_objects_list_version_batch_size: 10,
351 hummock_time_travel_filter_out_objects_list_delta_batch_size: 1000,
352 min_delta_log_num_for_hummock_version_checkpoint: 1,
353 min_sst_retention_time_sec: 3600 * 24 * 7,
354 full_gc_interval_sec: 3600 * 24 * 7,
355 full_gc_object_limit: 100_000,
356 gc_history_retention_time_sec: 3600 * 24 * 7,
357 max_inflight_time_travel_query: 1000,
358 enable_committed_sst_sanity_check: false,
359 periodic_compaction_interval_sec: 300,
360 node_num_monitor_interval_sec: 10,
361 protect_drop_table_with_incoming_sink: false,
362 prometheus_endpoint: None,
363 prometheus_selector: None,
364 vpc_id: None,
365 security_group_id: None,
366 privatelink_endpoint_default_tags: None,
367 periodic_space_reclaim_compaction_interval_sec: 60,
368 telemetry_enabled: false,
369 periodic_ttl_reclaim_compaction_interval_sec: 60,
370 periodic_tombstone_reclaim_compaction_interval_sec: 60,
371 periodic_scheduling_compaction_group_split_interval_sec: 60,
372 enable_compaction_group_normalize: false,
373 max_normalize_splits_per_round: 4,
374 compact_task_table_size_partition_threshold_low: 128 * 1024 * 1024,
375 compact_task_table_size_partition_threshold_high: 512 * 1024 * 1024,
376 table_high_write_throughput_threshold: 128 * 1024 * 1024,
377 table_low_write_throughput_threshold: 64 * 1024 * 1024,
378 do_not_config_object_storage_lifecycle: true,
379 partition_vnode_count: 32,
380 compaction_task_max_heartbeat_interval_secs: 0,
381 compaction_task_max_progress_interval_secs: 1,
382 compaction_task_id_refill_capacity: 64,
383 compaction_config: None,
384 hybrid_partition_node_count: 4,
385 event_log_enabled: false,
386 event_log_channel_max_size: 1,
387 advertise_addr: "".to_owned(),
388 cached_traces_num: 1,
389 cached_traces_memory_limit_bytes: usize::MAX,
390 enable_trivial_move: true,
391 enable_check_task_level_overlap: true,
392 enable_dropped_column_reclaim: false,
393 object_store_config: ObjectStoreConfig::default(),
394 max_trivial_move_task_count_per_loop: 256,
395 max_get_task_probe_times: 5,
396 secret_store_private_key: Some(
397 hex::decode("0123456789abcdef0123456789abcdef").unwrap(),
398 ),
399 temp_secret_file_dir: "./secrets".to_owned(),
400 actor_cnt_per_worker_parallelism_hard_limit: usize::MAX,
401 actor_cnt_per_worker_parallelism_soft_limit: usize::MAX,
402 split_group_size_ratio: 0.9,
403 table_stat_high_write_throughput_ratio_for_split: 0.5,
404 table_stat_low_write_throughput_ratio_for_merge: 0.7,
405 table_stat_throuput_window_seconds_for_split: 60,
406 table_stat_throuput_window_seconds_for_merge: 240,
407 periodic_scheduling_compaction_group_merge_interval_sec: 60 * 10,
408 compaction_group_merge_dimension_threshold: 1.2,
409 license_key_path: None,
410 compute_client_config: RpcClientConfig::default(),
411 stream_client_config: RpcClientConfig::default(),
412 frontend_client_config: RpcClientConfig::default(),
413 redact_sql_option_keywords: Arc::new(Default::default()),
414 cdc_table_split_init_sleep_interval_splits: 1000,
415 cdc_table_split_init_sleep_duration_millis: 10,
416 cdc_table_split_init_insert_batch_size: 1000,
417 enable_legacy_table_migration: true,
418 refresh_scheduler_interval_sec: 60,
419 pause_on_next_bootstrap_offline: false,
420 table_change_log_insert_batch_size: 1000,
421 table_change_log_delete_batch_size: 1000,
422 }
423 }
424}
425
426impl MetaSrvEnv {
427 pub async fn new(
428 opts: MetaOpts,
429 mut init_system_params: SystemParams,
430 init_session_config: SessionConfig,
431 meta_store_impl: SqlMetaStore,
432 ) -> MetaResult<Self> {
433 let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
434 let stream_client_pool =
435 Arc::new(StreamClientPool::new(1, opts.stream_client_config.clone())); let frontend_client_pool = Arc::new(FrontendClientPool::new(
437 1,
438 opts.frontend_client_config.clone(),
439 ));
440 let event_log_manager = Arc::new(start_event_log_manager(
441 opts.event_log_enabled,
442 opts.event_log_channel_max_size,
443 ));
444
445 if opts.license_key_path.is_some()
448 && init_system_params.license_key
449 != system_param::default::license_key_opt().map(Into::into)
450 {
451 bail!(
452 "argument `--license-key-path` (or env var `RW_LICENSE_KEY_PATH`) and \
453 system parameter `license_key` (or env var `RW_LICENSE_KEY`) may not \
454 be set at the same time"
455 );
456 }
457
458 let cluster_first_launch = meta_store_impl.up().await.context(
459 "Failed to initialize the meta store, \
460 this may happen if there's existing metadata incompatible with the current version of RisingWave, \
461 e.g., downgrading from a newer release or a nightly build to an older one. \
462 For a single-node deployment, you may want to reset all data by deleting the data directory, \
463 typically located at `~/.risingwave`.",
464 )?;
465
466 let notification_manager =
467 Arc::new(NotificationManager::new(meta_store_impl.clone()).await);
468 let cluster_id = Cluster::find()
469 .one(&meta_store_impl.conn)
470 .await?
471 .map(|c| c.cluster_id.to_string().into())
472 .unwrap();
473
474 init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch);
480
481 let system_param_controller = Arc::new(
482 SystemParamsController::new(
483 meta_store_impl.clone(),
484 notification_manager.clone(),
485 init_system_params,
486 )
487 .await?,
488 );
489 let session_param_controller = Arc::new(
490 SessionParamsController::new(
491 meta_store_impl.clone(),
492 notification_manager.clone(),
493 init_session_config,
494 )
495 .await?,
496 );
497 Ok(Self {
498 id_gen_manager_impl: Arc::new(SqlIdGeneratorManager::new(&meta_store_impl.conn).await?),
499 system_param_manager_impl: system_param_controller,
500 session_param_manager_impl: session_param_controller,
501 meta_store_impl: meta_store_impl.clone(),
502 shared_actor_info: SharedActorInfos::new(notification_manager.clone()),
503 notification_manager,
504 stream_client_pool,
505 frontend_client_pool,
506 idle_manager,
507 event_log_manager,
508 cluster_id,
509 hummock_seq: Arc::new(SequenceGenerator::new(meta_store_impl.conn.clone())),
510 opts: opts.into(),
511 await_tree_reg: await_tree::Registry::new(Default::default()),
513 actor_id_generator: Arc::new(AtomicU32::new(0)),
514 })
515 }
516
517 pub fn meta_store(&self) -> SqlMetaStore {
518 self.meta_store_impl.clone()
519 }
520
521 pub fn meta_store_ref(&self) -> &SqlMetaStore {
522 &self.meta_store_impl
523 }
524
525 pub fn id_gen_manager(&self) -> &SqlIdGeneratorManagerRef {
526 &self.id_gen_manager_impl
527 }
528
529 pub fn notification_manager_ref(&self) -> NotificationManagerRef {
530 self.notification_manager.clone()
531 }
532
533 pub fn notification_manager(&self) -> &NotificationManager {
534 self.notification_manager.deref()
535 }
536
537 pub fn idle_manager_ref(&self) -> IdleManagerRef {
538 self.idle_manager.clone()
539 }
540
541 pub fn idle_manager(&self) -> &IdleManager {
542 self.idle_manager.deref()
543 }
544
545 pub fn actor_id_generator(&self) -> &AtomicU32 {
546 self.actor_id_generator.deref()
547 }
548
549 pub async fn system_params_reader(&self) -> SystemParamsReader {
550 self.system_param_manager_impl.get_params().await
551 }
552
553 pub fn system_params_manager_impl_ref(&self) -> SystemParamsControllerRef {
554 self.system_param_manager_impl.clone()
555 }
556
557 pub fn session_params_manager_impl_ref(&self) -> SessionParamsControllerRef {
558 self.session_param_manager_impl.clone()
559 }
560
561 pub fn stream_client_pool_ref(&self) -> StreamClientPoolRef {
562 self.stream_client_pool.clone()
563 }
564
565 pub fn stream_client_pool(&self) -> &StreamClientPool {
566 self.stream_client_pool.deref()
567 }
568
569 pub fn frontend_client_pool(&self) -> &FrontendClientPool {
570 self.frontend_client_pool.deref()
571 }
572
573 pub fn cluster_id(&self) -> &ClusterId {
574 &self.cluster_id
575 }
576
577 pub fn event_log_manager_ref(&self) -> EventLogManagerRef {
578 self.event_log_manager.clone()
579 }
580
581 pub fn await_tree_reg(&self) -> &await_tree::Registry {
582 &self.await_tree_reg
583 }
584
585 pub fn shared_actor_infos(&self) -> &SharedActorInfos {
586 &self.shared_actor_info
587 }
588}
589
590#[cfg(any(test, feature = "test"))]
591impl MetaSrvEnv {
592 pub async fn for_test() -> Self {
594 Self::for_test_opts(MetaOpts::test(false), |_| ()).await
595 }
596
597 pub async fn for_test_opts(
598 opts: MetaOpts,
599 on_test_system_params: impl FnOnce(&mut risingwave_pb::meta::PbSystemParams),
600 ) -> Self {
601 let mut system_params = risingwave_common::system_param::system_params_for_test();
602 on_test_system_params(&mut system_params);
603 Self::new(
604 opts,
605 system_params,
606 Default::default(),
607 SqlMetaStore::for_test().await,
608 )
609 .await
610 .unwrap()
611 }
612}