1use std::ops::Deref;
16use std::path::PathBuf;
17use std::sync::Arc;
18
19use anyhow::Context;
20use risingwave_common::config::{
21 CompactionConfig, DefaultParallelism, ObjectStoreConfig, RpcClientConfig,
22};
23use risingwave_common::session_config::SessionConfig;
24use risingwave_common::system_param::reader::SystemParamsReader;
25use risingwave_common::{bail, system_param};
26use risingwave_meta_model::prelude::Cluster;
27use risingwave_pb::meta::SystemParams;
28use risingwave_rpc_client::{
29 FrontendClientPool, FrontendClientPoolRef, StreamClientPool, StreamClientPoolRef,
30};
31use sea_orm::EntityTrait;
32
33use crate::MetaResult;
34use crate::controller::SqlMetaStore;
35use crate::controller::id::{
36 IdGeneratorManager as SqlIdGeneratorManager, IdGeneratorManagerRef as SqlIdGeneratorManagerRef,
37};
38use crate::controller::session_params::{SessionParamsController, SessionParamsControllerRef};
39use crate::controller::system_param::{SystemParamsController, SystemParamsControllerRef};
40use crate::hummock::sequence::SequenceGenerator;
41use crate::manager::event_log::{EventLogManagerRef, start_event_log_manager};
42use crate::manager::{IdleManager, IdleManagerRef, NotificationManager, NotificationManagerRef};
43use crate::model::ClusterId;
44
45#[derive(Clone)]
48pub struct MetaSrvEnv {
49 id_gen_manager_impl: SqlIdGeneratorManagerRef,
51
52 system_param_manager_impl: SystemParamsControllerRef,
54
55 session_param_manager_impl: SessionParamsControllerRef,
57
58 meta_store_impl: SqlMetaStore,
60
61 notification_manager: NotificationManagerRef,
63
64 stream_client_pool: StreamClientPoolRef,
66
67 frontend_client_pool: FrontendClientPoolRef,
69
70 idle_manager: IdleManagerRef,
72
73 event_log_manager: EventLogManagerRef,
74
75 cluster_id: ClusterId,
77
78 pub hummock_seq: Arc<SequenceGenerator>,
79
80 pub opts: Arc<MetaOpts>,
82}
83
84#[derive(Clone, serde::Serialize)]
86pub struct MetaOpts {
87 pub enable_recovery: bool,
90 pub disable_automatic_parallelism_control: bool,
92 pub parallelism_control_batch_size: usize,
94 pub parallelism_control_trigger_period_sec: u64,
96 pub parallelism_control_trigger_first_delay_sec: u64,
98 pub in_flight_barrier_nums: usize,
100 pub max_idle_ms: u64,
103 pub compaction_deterministic_test: bool,
105 pub default_parallelism: DefaultParallelism,
107
108 pub vacuum_interval_sec: u64,
111 pub vacuum_spin_interval_ms: u64,
114 pub time_travel_vacuum_interval_sec: u64,
115 pub hummock_version_checkpoint_interval_sec: u64,
117 pub enable_hummock_data_archive: bool,
118 pub hummock_time_travel_snapshot_interval: u64,
119 pub hummock_time_travel_sst_info_fetch_batch_size: usize,
120 pub hummock_time_travel_sst_info_insert_batch_size: usize,
121 pub hummock_time_travel_epoch_version_insert_batch_size: usize,
122 pub hummock_gc_history_insert_batch_size: usize,
123 pub hummock_time_travel_filter_out_objects_batch_size: usize,
124 pub min_delta_log_num_for_hummock_version_checkpoint: u64,
129 pub min_sst_retention_time_sec: u64,
132 pub full_gc_interval_sec: u64,
134 pub full_gc_object_limit: u64,
136 pub gc_history_retention_time_sec: u64,
138 pub max_inflight_time_travel_query: u64,
140 pub enable_committed_sst_sanity_check: bool,
142 pub periodic_compaction_interval_sec: u64,
144 pub node_num_monitor_interval_sec: u64,
146
147 pub prometheus_endpoint: Option<String>,
153
154 pub prometheus_selector: Option<String>,
156
157 pub vpc_id: Option<String>,
159
160 pub security_group_id: Option<String>,
162
163 pub privatelink_endpoint_default_tags: Option<Vec<(String, String)>>,
167
168 pub periodic_space_reclaim_compaction_interval_sec: u64,
170
171 pub telemetry_enabled: bool,
173 pub periodic_ttl_reclaim_compaction_interval_sec: u64,
175
176 pub periodic_tombstone_reclaim_compaction_interval_sec: u64,
178
179 pub periodic_scheduling_compaction_group_split_interval_sec: u64,
181
182 pub do_not_config_object_storage_lifecycle: bool,
184
185 pub partition_vnode_count: u32,
186
187 pub table_high_write_throughput_threshold: u64,
189 pub table_low_write_throughput_threshold: u64,
191
192 pub compaction_task_max_heartbeat_interval_secs: u64,
193 pub compaction_task_max_progress_interval_secs: u64,
194 pub compaction_config: Option<CompactionConfig>,
195
196 pub hybrid_partition_node_count: u32,
204
205 pub event_log_enabled: bool,
206 pub event_log_channel_max_size: u32,
207 pub advertise_addr: String,
208 pub cached_traces_num: u32,
211 pub cached_traces_memory_limit_bytes: usize,
214
215 pub enable_trivial_move: bool,
217
218 pub enable_check_task_level_overlap: bool,
220 pub enable_dropped_column_reclaim: bool,
221
222 pub split_group_size_ratio: f64,
224
225 pub table_stat_high_write_throughput_ratio_for_split: f64,
227
228 pub table_stat_low_write_throughput_ratio_for_merge: f64,
230
231 pub table_stat_throuput_window_seconds_for_split: usize,
233
234 pub table_stat_throuput_window_seconds_for_merge: usize,
236
237 pub object_store_config: ObjectStoreConfig,
239
240 pub max_trivial_move_task_count_per_loop: usize,
242
243 pub max_get_task_probe_times: usize,
245
246 pub compact_task_table_size_partition_threshold_low: u64,
247 pub compact_task_table_size_partition_threshold_high: u64,
248
249 pub periodic_scheduling_compaction_group_merge_interval_sec: u64,
250
251 pub compaction_group_merge_dimension_threshold: f64,
252
253 pub secret_store_private_key: Option<Vec<u8>>,
255 pub temp_secret_file_dir: String,
257
258 pub actor_cnt_per_worker_parallelism_hard_limit: usize,
260 pub actor_cnt_per_worker_parallelism_soft_limit: usize,
261
262 pub license_key_path: Option<PathBuf>,
263
264 pub compute_client_config: RpcClientConfig,
265 pub stream_client_config: RpcClientConfig,
266 pub frontend_client_config: RpcClientConfig,
267}
268
269impl MetaOpts {
270 pub fn test(enable_recovery: bool) -> Self {
272 Self {
273 enable_recovery,
274 disable_automatic_parallelism_control: false,
275 parallelism_control_batch_size: 1,
276 parallelism_control_trigger_period_sec: 10,
277 parallelism_control_trigger_first_delay_sec: 30,
278 in_flight_barrier_nums: 40,
279 max_idle_ms: 0,
280 compaction_deterministic_test: false,
281 default_parallelism: DefaultParallelism::Full,
282 vacuum_interval_sec: 30,
283 time_travel_vacuum_interval_sec: 30,
284 vacuum_spin_interval_ms: 0,
285 hummock_version_checkpoint_interval_sec: 30,
286 enable_hummock_data_archive: false,
287 hummock_time_travel_snapshot_interval: 0,
288 hummock_time_travel_sst_info_fetch_batch_size: 10_000,
289 hummock_time_travel_sst_info_insert_batch_size: 10,
290 hummock_time_travel_epoch_version_insert_batch_size: 1000,
291 hummock_gc_history_insert_batch_size: 1000,
292 hummock_time_travel_filter_out_objects_batch_size: 1000,
293 min_delta_log_num_for_hummock_version_checkpoint: 1,
294 min_sst_retention_time_sec: 3600 * 24 * 7,
295 full_gc_interval_sec: 3600 * 24 * 7,
296 full_gc_object_limit: 100_000,
297 gc_history_retention_time_sec: 3600 * 24 * 7,
298 max_inflight_time_travel_query: 1000,
299 enable_committed_sst_sanity_check: false,
300 periodic_compaction_interval_sec: 60,
301 node_num_monitor_interval_sec: 10,
302 prometheus_endpoint: None,
303 prometheus_selector: None,
304 vpc_id: None,
305 security_group_id: None,
306 privatelink_endpoint_default_tags: None,
307 periodic_space_reclaim_compaction_interval_sec: 60,
308 telemetry_enabled: false,
309 periodic_ttl_reclaim_compaction_interval_sec: 60,
310 periodic_tombstone_reclaim_compaction_interval_sec: 60,
311 periodic_scheduling_compaction_group_split_interval_sec: 60,
312 compact_task_table_size_partition_threshold_low: 128 * 1024 * 1024,
313 compact_task_table_size_partition_threshold_high: 512 * 1024 * 1024,
314 table_high_write_throughput_threshold: 128 * 1024 * 1024,
315 table_low_write_throughput_threshold: 64 * 1024 * 1024,
316 do_not_config_object_storage_lifecycle: true,
317 partition_vnode_count: 32,
318 compaction_task_max_heartbeat_interval_secs: 0,
319 compaction_task_max_progress_interval_secs: 1,
320 compaction_config: None,
321 hybrid_partition_node_count: 4,
322 event_log_enabled: false,
323 event_log_channel_max_size: 1,
324 advertise_addr: "".to_owned(),
325 cached_traces_num: 1,
326 cached_traces_memory_limit_bytes: usize::MAX,
327 enable_trivial_move: true,
328 enable_check_task_level_overlap: true,
329 enable_dropped_column_reclaim: false,
330 object_store_config: ObjectStoreConfig::default(),
331 max_trivial_move_task_count_per_loop: 256,
332 max_get_task_probe_times: 5,
333 secret_store_private_key: Some(
334 hex::decode("0123456789abcdef0123456789abcdef").unwrap(),
335 ),
336 temp_secret_file_dir: "./secrets".to_owned(),
337 actor_cnt_per_worker_parallelism_hard_limit: usize::MAX,
338 actor_cnt_per_worker_parallelism_soft_limit: usize::MAX,
339 split_group_size_ratio: 0.9,
340 table_stat_high_write_throughput_ratio_for_split: 0.5,
341 table_stat_low_write_throughput_ratio_for_merge: 0.7,
342 table_stat_throuput_window_seconds_for_split: 60,
343 table_stat_throuput_window_seconds_for_merge: 240,
344 periodic_scheduling_compaction_group_merge_interval_sec: 60 * 10,
345 compaction_group_merge_dimension_threshold: 1.2,
346 license_key_path: None,
347 compute_client_config: RpcClientConfig::default(),
348 stream_client_config: RpcClientConfig::default(),
349 frontend_client_config: RpcClientConfig::default(),
350 }
351 }
352}
353
354impl MetaSrvEnv {
355 pub async fn new(
356 opts: MetaOpts,
357 mut init_system_params: SystemParams,
358 init_session_config: SessionConfig,
359 meta_store_impl: SqlMetaStore,
360 ) -> MetaResult<Self> {
361 let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
362 let stream_client_pool =
363 Arc::new(StreamClientPool::new(1, opts.stream_client_config.clone())); let frontend_client_pool = Arc::new(FrontendClientPool::new(
365 1,
366 opts.frontend_client_config.clone(),
367 ));
368 let event_log_manager = Arc::new(start_event_log_manager(
369 opts.event_log_enabled,
370 opts.event_log_channel_max_size,
371 ));
372
373 if opts.license_key_path.is_some()
376 && init_system_params.license_key
377 != system_param::default::license_key_opt().map(Into::into)
378 {
379 bail!(
380 "argument `--license-key-path` (or env var `RW_LICENSE_KEY_PATH`) and \
381 system parameter `license_key` (or env var `RW_LICENSE_KEY`) may not \
382 be set at the same time"
383 );
384 }
385
386 let cluster_first_launch = meta_store_impl.up().await.context(
387 "Failed to initialize the meta store, \
388 this may happen if there's existing metadata incompatible with the current version of RisingWave, \
389 e.g., downgrading from a newer release or a nightly build to an older one. \
390 For a single-node deployment, you may want to reset all data by deleting the data directory, \
391 typically located at `~/.risingwave`.",
392 )?;
393
394 let notification_manager =
395 Arc::new(NotificationManager::new(meta_store_impl.clone()).await);
396 let cluster_id = Cluster::find()
397 .one(&meta_store_impl.conn)
398 .await?
399 .map(|c| c.cluster_id.to_string().into())
400 .unwrap();
401
402 init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch);
408
409 let system_param_controller = Arc::new(
410 SystemParamsController::new(
411 meta_store_impl.clone(),
412 notification_manager.clone(),
413 init_system_params,
414 )
415 .await?,
416 );
417 let session_param_controller = Arc::new(
418 SessionParamsController::new(
419 meta_store_impl.clone(),
420 notification_manager.clone(),
421 init_session_config,
422 )
423 .await?,
424 );
425 Ok(Self {
426 id_gen_manager_impl: Arc::new(SqlIdGeneratorManager::new(&meta_store_impl.conn).await?),
427 system_param_manager_impl: system_param_controller,
428 session_param_manager_impl: session_param_controller,
429 meta_store_impl: meta_store_impl.clone(),
430 notification_manager,
431 stream_client_pool,
432 frontend_client_pool,
433 idle_manager,
434 event_log_manager,
435 cluster_id,
436 hummock_seq: Arc::new(SequenceGenerator::new(meta_store_impl.conn.clone())),
437 opts: opts.into(),
438 })
439 }
440
441 pub fn meta_store(&self) -> SqlMetaStore {
442 self.meta_store_impl.clone()
443 }
444
445 pub fn meta_store_ref(&self) -> &SqlMetaStore {
446 &self.meta_store_impl
447 }
448
449 pub fn id_gen_manager(&self) -> &SqlIdGeneratorManagerRef {
450 &self.id_gen_manager_impl
451 }
452
453 pub fn notification_manager_ref(&self) -> NotificationManagerRef {
454 self.notification_manager.clone()
455 }
456
457 pub fn notification_manager(&self) -> &NotificationManager {
458 self.notification_manager.deref()
459 }
460
461 pub fn idle_manager_ref(&self) -> IdleManagerRef {
462 self.idle_manager.clone()
463 }
464
465 pub fn idle_manager(&self) -> &IdleManager {
466 self.idle_manager.deref()
467 }
468
469 pub async fn system_params_reader(&self) -> SystemParamsReader {
470 self.system_param_manager_impl.get_params().await
471 }
472
473 pub fn system_params_manager_impl_ref(&self) -> SystemParamsControllerRef {
474 self.system_param_manager_impl.clone()
475 }
476
477 pub fn session_params_manager_impl_ref(&self) -> SessionParamsControllerRef {
478 self.session_param_manager_impl.clone()
479 }
480
481 pub fn stream_client_pool_ref(&self) -> StreamClientPoolRef {
482 self.stream_client_pool.clone()
483 }
484
485 pub fn stream_client_pool(&self) -> &StreamClientPool {
486 self.stream_client_pool.deref()
487 }
488
489 pub fn frontend_client_pool(&self) -> &FrontendClientPool {
490 self.frontend_client_pool.deref()
491 }
492
493 pub fn cluster_id(&self) -> &ClusterId {
494 &self.cluster_id
495 }
496
497 pub fn event_log_manager_ref(&self) -> EventLogManagerRef {
498 self.event_log_manager.clone()
499 }
500}
501
502#[cfg(any(test, feature = "test"))]
503impl MetaSrvEnv {
504 pub async fn for_test() -> Self {
506 Self::for_test_opts(MetaOpts::test(false)).await
507 }
508
509 pub async fn for_test_opts(opts: MetaOpts) -> Self {
510 Self::new(
511 opts,
512 risingwave_common::system_param::system_params_for_test(),
513 Default::default(),
514 SqlMetaStore::for_test().await,
515 )
516 .await
517 .unwrap()
518 }
519}