1use std::ops::Deref;
16use std::path::PathBuf;
17use std::sync::Arc;
18
19use anyhow::Context;
20use risingwave_common::config::{
21 CompactionConfig, DefaultParallelism, ObjectStoreConfig, RpcClientConfig,
22};
23use risingwave_common::session_config::SessionConfig;
24use risingwave_common::system_param::reader::SystemParamsReader;
25use risingwave_common::{bail, system_param};
26use risingwave_meta_model::prelude::Cluster;
27use risingwave_pb::meta::SystemParams;
28use risingwave_rpc_client::{
29 FrontendClientPool, FrontendClientPoolRef, StreamClientPool, StreamClientPoolRef,
30};
31use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
32use sea_orm::EntityTrait;
33
34use crate::MetaResult;
35use crate::controller::SqlMetaStore;
36use crate::controller::id::{
37 IdGeneratorManager as SqlIdGeneratorManager, IdGeneratorManagerRef as SqlIdGeneratorManagerRef,
38};
39use crate::controller::session_params::{SessionParamsController, SessionParamsControllerRef};
40use crate::controller::system_param::{SystemParamsController, SystemParamsControllerRef};
41use crate::hummock::sequence::SequenceGenerator;
42use crate::manager::event_log::{EventLogManagerRef, start_event_log_manager};
43use crate::manager::{IdleManager, IdleManagerRef, NotificationManager, NotificationManagerRef};
44use crate::model::ClusterId;
45
46#[derive(Clone)]
49pub struct MetaSrvEnv {
50 id_gen_manager_impl: SqlIdGeneratorManagerRef,
52
53 system_param_manager_impl: SystemParamsControllerRef,
55
56 session_param_manager_impl: SessionParamsControllerRef,
58
59 meta_store_impl: SqlMetaStore,
61
62 notification_manager: NotificationManagerRef,
64
65 stream_client_pool: StreamClientPoolRef,
67
68 frontend_client_pool: FrontendClientPoolRef,
70
71 idle_manager: IdleManagerRef,
73
74 event_log_manager: EventLogManagerRef,
75
76 cluster_id: ClusterId,
78
79 pub hummock_seq: Arc<SequenceGenerator>,
80
81 await_tree_reg: await_tree::Registry,
83
84 pub opts: Arc<MetaOpts>,
86}
87
88#[derive(Clone, serde::Serialize)]
90pub struct MetaOpts {
91 pub enable_recovery: bool,
94 pub disable_automatic_parallelism_control: bool,
96 pub parallelism_control_batch_size: usize,
98 pub parallelism_control_trigger_period_sec: u64,
100 pub parallelism_control_trigger_first_delay_sec: u64,
102 pub in_flight_barrier_nums: usize,
104 pub max_idle_ms: u64,
107 pub compaction_deterministic_test: bool,
109 pub default_parallelism: DefaultParallelism,
111
112 pub vacuum_interval_sec: u64,
115 pub vacuum_spin_interval_ms: u64,
118 pub time_travel_vacuum_interval_sec: u64,
119 pub hummock_version_checkpoint_interval_sec: u64,
121 pub enable_hummock_data_archive: bool,
122 pub hummock_time_travel_snapshot_interval: u64,
123 pub hummock_time_travel_sst_info_fetch_batch_size: usize,
124 pub hummock_time_travel_sst_info_insert_batch_size: usize,
125 pub hummock_time_travel_epoch_version_insert_batch_size: usize,
126 pub hummock_gc_history_insert_batch_size: usize,
127 pub hummock_time_travel_filter_out_objects_batch_size: usize,
128 pub hummock_time_travel_filter_out_objects_v1: bool,
129 pub hummock_time_travel_filter_out_objects_list_version_batch_size: usize,
130 pub hummock_time_travel_filter_out_objects_list_delta_batch_size: usize,
131 pub min_delta_log_num_for_hummock_version_checkpoint: u64,
136 pub min_sst_retention_time_sec: u64,
139 pub full_gc_interval_sec: u64,
141 pub full_gc_object_limit: u64,
143 pub gc_history_retention_time_sec: u64,
145 pub max_inflight_time_travel_query: u64,
147 pub enable_committed_sst_sanity_check: bool,
149 pub periodic_compaction_interval_sec: u64,
151 pub node_num_monitor_interval_sec: u64,
153 pub protect_drop_table_with_incoming_sink: bool,
155 pub prometheus_endpoint: Option<String>,
161
162 pub prometheus_selector: Option<String>,
164
165 pub vpc_id: Option<String>,
167
168 pub security_group_id: Option<String>,
170
171 pub privatelink_endpoint_default_tags: Option<Vec<(String, String)>>,
175
176 pub periodic_space_reclaim_compaction_interval_sec: u64,
178
179 pub telemetry_enabled: bool,
181 pub periodic_ttl_reclaim_compaction_interval_sec: u64,
183
184 pub periodic_tombstone_reclaim_compaction_interval_sec: u64,
186
187 pub periodic_scheduling_compaction_group_split_interval_sec: u64,
189
190 pub do_not_config_object_storage_lifecycle: bool,
192
193 pub partition_vnode_count: u32,
194
195 pub table_high_write_throughput_threshold: u64,
197 pub table_low_write_throughput_threshold: u64,
199
200 pub compaction_task_max_heartbeat_interval_secs: u64,
201 pub compaction_task_max_progress_interval_secs: u64,
202 pub compaction_config: Option<CompactionConfig>,
203
204 pub hybrid_partition_node_count: u32,
212
213 pub event_log_enabled: bool,
214 pub event_log_channel_max_size: u32,
215 pub advertise_addr: String,
216 pub cached_traces_num: u32,
219 pub cached_traces_memory_limit_bytes: usize,
222
223 pub enable_trivial_move: bool,
225
226 pub enable_check_task_level_overlap: bool,
228 pub enable_dropped_column_reclaim: bool,
229
230 pub split_group_size_ratio: f64,
232
233 pub table_stat_high_write_throughput_ratio_for_split: f64,
235
236 pub table_stat_low_write_throughput_ratio_for_merge: f64,
238
239 pub table_stat_throuput_window_seconds_for_split: usize,
241
242 pub table_stat_throuput_window_seconds_for_merge: usize,
244
245 pub object_store_config: ObjectStoreConfig,
247
248 pub max_trivial_move_task_count_per_loop: usize,
250
251 pub max_get_task_probe_times: usize,
253
254 pub compact_task_table_size_partition_threshold_low: u64,
255 pub compact_task_table_size_partition_threshold_high: u64,
256
257 pub periodic_scheduling_compaction_group_merge_interval_sec: u64,
258
259 pub compaction_group_merge_dimension_threshold: f64,
260
261 pub secret_store_private_key: Option<Vec<u8>>,
263 pub temp_secret_file_dir: String,
265
266 pub actor_cnt_per_worker_parallelism_hard_limit: usize,
268 pub actor_cnt_per_worker_parallelism_soft_limit: usize,
269
270 pub license_key_path: Option<PathBuf>,
271
272 pub compute_client_config: RpcClientConfig,
273 pub stream_client_config: RpcClientConfig,
274 pub frontend_client_config: RpcClientConfig,
275 pub redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
276}
277
278impl MetaOpts {
279 pub fn test(enable_recovery: bool) -> Self {
281 Self {
282 enable_recovery,
283 disable_automatic_parallelism_control: false,
284 parallelism_control_batch_size: 1,
285 parallelism_control_trigger_period_sec: 10,
286 parallelism_control_trigger_first_delay_sec: 30,
287 in_flight_barrier_nums: 40,
288 max_idle_ms: 0,
289 compaction_deterministic_test: false,
290 default_parallelism: DefaultParallelism::Full,
291 vacuum_interval_sec: 30,
292 time_travel_vacuum_interval_sec: 30,
293 vacuum_spin_interval_ms: 0,
294 hummock_version_checkpoint_interval_sec: 30,
295 enable_hummock_data_archive: false,
296 hummock_time_travel_snapshot_interval: 0,
297 hummock_time_travel_sst_info_fetch_batch_size: 10_000,
298 hummock_time_travel_sst_info_insert_batch_size: 10,
299 hummock_time_travel_epoch_version_insert_batch_size: 1000,
300 hummock_gc_history_insert_batch_size: 1000,
301 hummock_time_travel_filter_out_objects_batch_size: 1000,
302 hummock_time_travel_filter_out_objects_v1: false,
303 hummock_time_travel_filter_out_objects_list_version_batch_size: 10,
304 hummock_time_travel_filter_out_objects_list_delta_batch_size: 1000,
305 min_delta_log_num_for_hummock_version_checkpoint: 1,
306 min_sst_retention_time_sec: 3600 * 24 * 7,
307 full_gc_interval_sec: 3600 * 24 * 7,
308 full_gc_object_limit: 100_000,
309 gc_history_retention_time_sec: 3600 * 24 * 7,
310 max_inflight_time_travel_query: 1000,
311 enable_committed_sst_sanity_check: false,
312 periodic_compaction_interval_sec: 60,
313 node_num_monitor_interval_sec: 10,
314 protect_drop_table_with_incoming_sink: false,
315 prometheus_endpoint: None,
316 prometheus_selector: None,
317 vpc_id: None,
318 security_group_id: None,
319 privatelink_endpoint_default_tags: None,
320 periodic_space_reclaim_compaction_interval_sec: 60,
321 telemetry_enabled: false,
322 periodic_ttl_reclaim_compaction_interval_sec: 60,
323 periodic_tombstone_reclaim_compaction_interval_sec: 60,
324 periodic_scheduling_compaction_group_split_interval_sec: 60,
325 compact_task_table_size_partition_threshold_low: 128 * 1024 * 1024,
326 compact_task_table_size_partition_threshold_high: 512 * 1024 * 1024,
327 table_high_write_throughput_threshold: 128 * 1024 * 1024,
328 table_low_write_throughput_threshold: 64 * 1024 * 1024,
329 do_not_config_object_storage_lifecycle: true,
330 partition_vnode_count: 32,
331 compaction_task_max_heartbeat_interval_secs: 0,
332 compaction_task_max_progress_interval_secs: 1,
333 compaction_config: None,
334 hybrid_partition_node_count: 4,
335 event_log_enabled: false,
336 event_log_channel_max_size: 1,
337 advertise_addr: "".to_owned(),
338 cached_traces_num: 1,
339 cached_traces_memory_limit_bytes: usize::MAX,
340 enable_trivial_move: true,
341 enable_check_task_level_overlap: true,
342 enable_dropped_column_reclaim: false,
343 object_store_config: ObjectStoreConfig::default(),
344 max_trivial_move_task_count_per_loop: 256,
345 max_get_task_probe_times: 5,
346 secret_store_private_key: Some(
347 hex::decode("0123456789abcdef0123456789abcdef").unwrap(),
348 ),
349 temp_secret_file_dir: "./secrets".to_owned(),
350 actor_cnt_per_worker_parallelism_hard_limit: usize::MAX,
351 actor_cnt_per_worker_parallelism_soft_limit: usize::MAX,
352 split_group_size_ratio: 0.9,
353 table_stat_high_write_throughput_ratio_for_split: 0.5,
354 table_stat_low_write_throughput_ratio_for_merge: 0.7,
355 table_stat_throuput_window_seconds_for_split: 60,
356 table_stat_throuput_window_seconds_for_merge: 240,
357 periodic_scheduling_compaction_group_merge_interval_sec: 60 * 10,
358 compaction_group_merge_dimension_threshold: 1.2,
359 license_key_path: None,
360 compute_client_config: RpcClientConfig::default(),
361 stream_client_config: RpcClientConfig::default(),
362 frontend_client_config: RpcClientConfig::default(),
363 redact_sql_option_keywords: Arc::new(Default::default()),
364 }
365 }
366}
367
368impl MetaSrvEnv {
369 pub async fn new(
370 opts: MetaOpts,
371 mut init_system_params: SystemParams,
372 init_session_config: SessionConfig,
373 meta_store_impl: SqlMetaStore,
374 ) -> MetaResult<Self> {
375 let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
376 let stream_client_pool =
377 Arc::new(StreamClientPool::new(1, opts.stream_client_config.clone())); let frontend_client_pool = Arc::new(FrontendClientPool::new(
379 1,
380 opts.frontend_client_config.clone(),
381 ));
382 let event_log_manager = Arc::new(start_event_log_manager(
383 opts.event_log_enabled,
384 opts.event_log_channel_max_size,
385 ));
386
387 if opts.license_key_path.is_some()
390 && init_system_params.license_key
391 != system_param::default::license_key_opt().map(Into::into)
392 {
393 bail!(
394 "argument `--license-key-path` (or env var `RW_LICENSE_KEY_PATH`) and \
395 system parameter `license_key` (or env var `RW_LICENSE_KEY`) may not \
396 be set at the same time"
397 );
398 }
399
400 let cluster_first_launch = meta_store_impl.up().await.context(
401 "Failed to initialize the meta store, \
402 this may happen if there's existing metadata incompatible with the current version of RisingWave, \
403 e.g., downgrading from a newer release or a nightly build to an older one. \
404 For a single-node deployment, you may want to reset all data by deleting the data directory, \
405 typically located at `~/.risingwave`.",
406 )?;
407
408 let notification_manager =
409 Arc::new(NotificationManager::new(meta_store_impl.clone()).await);
410 let cluster_id = Cluster::find()
411 .one(&meta_store_impl.conn)
412 .await?
413 .map(|c| c.cluster_id.to_string().into())
414 .unwrap();
415
416 init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch);
422
423 let system_param_controller = Arc::new(
424 SystemParamsController::new(
425 meta_store_impl.clone(),
426 notification_manager.clone(),
427 init_system_params,
428 )
429 .await?,
430 );
431 let session_param_controller = Arc::new(
432 SessionParamsController::new(
433 meta_store_impl.clone(),
434 notification_manager.clone(),
435 init_session_config,
436 )
437 .await?,
438 );
439 Ok(Self {
440 id_gen_manager_impl: Arc::new(SqlIdGeneratorManager::new(&meta_store_impl.conn).await?),
441 system_param_manager_impl: system_param_controller,
442 session_param_manager_impl: session_param_controller,
443 meta_store_impl: meta_store_impl.clone(),
444 notification_manager,
445 stream_client_pool,
446 frontend_client_pool,
447 idle_manager,
448 event_log_manager,
449 cluster_id,
450 hummock_seq: Arc::new(SequenceGenerator::new(meta_store_impl.conn.clone())),
451 opts: opts.into(),
452 await_tree_reg: await_tree::Registry::new(Default::default()),
454 })
455 }
456
457 pub fn meta_store(&self) -> SqlMetaStore {
458 self.meta_store_impl.clone()
459 }
460
461 pub fn meta_store_ref(&self) -> &SqlMetaStore {
462 &self.meta_store_impl
463 }
464
465 pub fn id_gen_manager(&self) -> &SqlIdGeneratorManagerRef {
466 &self.id_gen_manager_impl
467 }
468
469 pub fn notification_manager_ref(&self) -> NotificationManagerRef {
470 self.notification_manager.clone()
471 }
472
473 pub fn notification_manager(&self) -> &NotificationManager {
474 self.notification_manager.deref()
475 }
476
477 pub fn idle_manager_ref(&self) -> IdleManagerRef {
478 self.idle_manager.clone()
479 }
480
481 pub fn idle_manager(&self) -> &IdleManager {
482 self.idle_manager.deref()
483 }
484
485 pub async fn system_params_reader(&self) -> SystemParamsReader {
486 self.system_param_manager_impl.get_params().await
487 }
488
489 pub fn system_params_manager_impl_ref(&self) -> SystemParamsControllerRef {
490 self.system_param_manager_impl.clone()
491 }
492
493 pub fn session_params_manager_impl_ref(&self) -> SessionParamsControllerRef {
494 self.session_param_manager_impl.clone()
495 }
496
497 pub fn stream_client_pool_ref(&self) -> StreamClientPoolRef {
498 self.stream_client_pool.clone()
499 }
500
501 pub fn stream_client_pool(&self) -> &StreamClientPool {
502 self.stream_client_pool.deref()
503 }
504
505 pub fn frontend_client_pool(&self) -> &FrontendClientPool {
506 self.frontend_client_pool.deref()
507 }
508
509 pub fn cluster_id(&self) -> &ClusterId {
510 &self.cluster_id
511 }
512
513 pub fn event_log_manager_ref(&self) -> EventLogManagerRef {
514 self.event_log_manager.clone()
515 }
516
517 pub fn await_tree_reg(&self) -> &await_tree::Registry {
518 &self.await_tree_reg
519 }
520}
521
522#[cfg(any(test, feature = "test"))]
523impl MetaSrvEnv {
524 pub async fn for_test() -> Self {
526 Self::for_test_opts(MetaOpts::test(false), |_| ()).await
527 }
528
529 pub async fn for_test_opts(
530 opts: MetaOpts,
531 on_test_system_params: impl FnOnce(&mut risingwave_pb::meta::PbSystemParams),
532 ) -> Self {
533 let mut system_params = risingwave_common::system_param::system_params_for_test();
534 on_test_system_params(&mut system_params);
535 Self::new(
536 opts,
537 system_params,
538 Default::default(),
539 SqlMetaStore::for_test().await,
540 )
541 .await
542 .unwrap()
543 }
544}